feat: add doc

This commit is contained in:
earayu 2025-05-23 11:52:06 +08:00
parent ada2443653
commit 8bafa49d5d
3 changed files with 566 additions and 0 deletions

View file

@ -0,0 +1,286 @@
# LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis
LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur.
## Overview
LightRAG's concurrent control is divided into three layers:
1. **Document-level concurrency**: Controls the number of documents processed simultaneously
2. **Chunk-level concurrency**: Controls the number of chunks processed simultaneously within a single document
3. **LLM request-level concurrency**: Controls the global concurrent number of LLM requests
## 1. Document-Level Concurrent Control
**Control Parameter**: `max_parallel_insert`
Document-level concurrency is controlled by the `max_parallel_insert` parameter, with a default value of 2.
```python
# lightrag/lightrag.py
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
```
### Implementation Mechanism
In the `apipeline_process_enqueue_documents` method, a semaphore is used to control document concurrency:
```python
# lightrag/lightrag.py - apipeline_process_enqueue_documents method
async def process_document(
doc_id: str,
status_doc: DocProcessingStatus,
split_by_character: str | None,
split_by_character_only: bool,
pipeline_status: dict,
pipeline_status_lock: asyncio.Lock,
semaphore: asyncio.Semaphore, # Document-level semaphore
) -> None:
"""Process single document"""
async with semaphore: # 🔥 Document-level concurrent control
# ... Process all chunks of a single document
# Create document-level semaphore
semaphore = asyncio.Semaphore(self.max_parallel_insert) # Default 2
# Create processing tasks for each document
doc_tasks = []
for doc_id, status_doc in to_process_docs.items():
doc_tasks.append(
process_document(
doc_id, status_doc, split_by_character, split_by_character_only,
pipeline_status, pipeline_status_lock, semaphore
)
)
# Wait for all documents to complete processing
await asyncio.gather(*doc_tasks)
```
## 2. Chunk-Level Concurrent Control
**Control Parameter**: `llm_model_max_async`
**Key Point**: Each document independently creates its own chunk semaphore!
```python
# lightrag/lightrag.py
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
```
### Implementation Mechanism
In the `extract_entities` function, **each document independently creates** its own chunk semaphore:
```python
# lightrag/operate.py - extract_entities function
async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
# 🔥 Key: Each document independently creates this semaphore!
llm_model_max_async = global_config.get("llm_model_max_async", 4)
semaphore = asyncio.Semaphore(llm_model_max_async) # Chunk semaphore for each document
async def _process_with_semaphore(chunk):
async with semaphore: # 🔥 Chunk concurrent control within document
return await _process_single_content(chunk)
# Create tasks for each chunk
tasks = []
for c in ordered_chunks:
task = asyncio.create_task(_process_with_semaphore(c))
tasks.append(task)
# Wait for all chunks to complete processing
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
chunk_results = [task.result() for task in tasks]
return chunk_results
```
### Important Inference: System Overall Chunk Concurrency
Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is:
**Theoretical Chunk Concurrency = max_parallel_insert × llm_model_max_async**
For example:
- `max_parallel_insert = 2` (process 2 documents simultaneously)
- `llm_model_max_async = 4` (maximum 4 chunk concurrency per document)
- **Theoretical result**: Maximum 2 × 4 = 8 chunks simultaneously in "processing" state
## 3. LLM Request-Level Concurrent Control (The Real Bottleneck)
**Control Parameter**: `llm_model_max_async` (globally shared)
**Key**: Although there might be 8 chunks "in processing", all LLM requests share the same global priority queue!
```python
# lightrag/lightrag.py - __post_init__ method
self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
partial(
self.llm_model_func,
hashing_kv=hashing_kv,
**self.llm_model_kwargs,
)
)
# 🔥 Global LLM queue size = llm_model_max_async = 4
```
### Priority Queue Implementation
```python
# lightrag/utils.py - priority_limit_async_func_call function
def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
def final_decro(func):
queue = asyncio.PriorityQueue(maxsize=max_queue_size)
tasks = set()
async def worker():
"""Worker that processes tasks in the priority queue"""
while not shutdown_event.is_set():
try:
priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
result = await func(*args, **kwargs) # 🔥 Actual LLM call
if not future.done():
future.set_result(result)
except Exception as e:
# Error handling...
finally:
queue.task_done()
# 🔥 Create fixed number of workers (max_size), this is the real concurrency limit
for _ in range(max_size):
task = asyncio.create_task(worker())
tasks.add(task)
```
## 4. Chunk Internal Processing Mechanism (Serial)
### Why Serial?
Internal processing of each chunk strictly follows this serial execution order:
```python
# lightrag/operate.py - _process_single_content function
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
# Step 1: Initial entity extraction
hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
# Process initial extraction results
maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
# Step 2: Gleaning phase
for now_glean_index in range(entity_extract_max_gleaning):
# 🔥 Serial wait for gleaning results
glean_result = await use_llm_func_with_cache(
continue_prompt, use_llm_func,
llm_response_cache=llm_response_cache,
history_messages=history, cache_type="extract"
)
# Process gleaning results
glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
# Merge results...
# Step 3: Determine whether to continue loop
if now_glean_index == entity_extract_max_gleaning - 1:
break
# 🔥 Serial wait for loop decision results
if_loop_result = await use_llm_func_with_cache(
if_loop_prompt, use_llm_func,
llm_response_cache=llm_response_cache,
history_messages=history, cache_type="extract"
)
if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
break
return maybe_nodes, maybe_edges
```
## 5. Complete Concurrent Hierarchy Diagram
![lightrag_indexing.png](assets%2Flightrag_indexing.png)
### Chunk Internal Processing (Serial)
```
Initial Extraction → Gleaning → Loop Decision → Complete
```
## 6. Real-World Scenario Analysis
### Scenario 1: Single Document with Multiple Chunks
Assume 1 document with 6 chunks:
- **Document level**: Only 1 document, not limited by `max_parallel_insert`
- **Chunk level**: Maximum 4 chunks processed simultaneously (limited by `llm_model_max_async=4`)
- **LLM level**: Global maximum 4 LLM requests concurrent
**Expected behavior**: 4 chunks process concurrently, remaining 2 chunks wait.
### Scenario 2: Multiple Documents with Multiple Chunks
Assume 3 documents, each with 10 chunks:
- **Document level**: Maximum 2 documents processed simultaneously
- **Chunk level**: Maximum 4 chunks per document processed simultaneously
- **Theoretical Chunk concurrency**: 2 × 4 = 8 chunks processed simultaneously
- **Actual LLM concurrency**: Only 4 LLM requests actually execute
**Actual state distribution**:
```
# Possible system state:
Document 1: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
Document 2: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response)
Document 3: Waiting for document-level semaphore
Total:
- 8 chunks in "processing" state
- 4 LLM requests actually executing
- 4 chunks waiting for LLM response
```
### Scenario 3: Resource Bottleneck Analysis
**Timeline Example (max_parallel_insert=2, llm_model_max_async=4)**:
![lightrag_llm_request_timeline.png](assets%2Flightrag_llm_request_timeline.png)
## 7. Performance Optimization Recommendations
### Understanding the Bottleneck
The real bottleneck is the global LLM queue, not the chunk semaphores!
### Adjustment Strategies
**Strategy 1: Increase LLM Concurrent Capacity**
```bash
# Environment variable configuration
export MAX_PARALLEL_INSERT=2 # Keep document concurrency
export MAX_ASYNC=8 # 🔥 Increase LLM request concurrency
```
**Strategy 2: Balance Document and LLM Concurrency**
```python
rag = LightRAG(
max_parallel_insert=3, # Moderately increase document concurrency
llm_model_max_async=12, # Significantly increase LLM concurrency
entity_extract_max_gleaning=0, # Reduce serial steps within chunks
)
```
## 8. Summary
Key characteristics of LightRAG's multi-document concurrent processing mechanism:
### Concurrent Layers
1. **Inter-document competition**: Controlled by `max_parallel_insert`, default 2 documents concurrent
2. **Theoretical Chunk concurrency**: Each document independently creates semaphores, total = max_parallel_insert × llm_model_max_async
3. **Actual LLM concurrency**: All chunks share global LLM queue, controlled by `llm_model_max_async`
4. **Intra-chunk serial**: Multiple LLM requests within each chunk execute strictly serially
### Key Insights
- **Theoretical vs Actual**: System may have many chunks "in processing", but only few are actually executing LLM requests
- **Real Bottleneck**: Global LLM request queue is the performance bottleneck, not chunk semaphores
- **Optimization Focus**: Increasing `llm_model_max_async` is more effective than increasing `max_parallel_insert`

Binary file not shown.

After

Width:  |  Height:  |  Size: 118 KiB

View file

@ -0,0 +1,280 @@
# LightRAG 多文档并发控制机制详解
LightRAG 在处理多个文档时采用了多层次的并发控制策略。本文将深入分析文档级别、chunk级别和LLM请求级别的并发控制机制帮助您理解为什么会出现特定的并发行为。
## 概述
LightRAG 的并发控制分为三个层次:
1. 文档级别并发:控制同时处理的文档数量
2. Chunk级别并发控制单个文档内同时处理的chunk数量
3. LLM请求级别并发控制全局LLM请求的并发数量
## 1. 文档级别并发控制
**控制参数**`max_parallel_insert`
文档级别的并发由 `max_parallel_insert` 参数控制默认值为2。
```python
# lightrag/lightrag.py
max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2)))
```
### 实现机制
`apipeline_process_enqueue_documents` 方法中,使用信号量控制文档并发:
```python
# lightrag/lightrag.py - apipeline_process_enqueue_documents方法
async def process_document(
doc_id: str,
status_doc: DocProcessingStatus,
split_by_character: str | None,
split_by_character_only: bool,
pipeline_status: dict,
pipeline_status_lock: asyncio.Lock,
semaphore: asyncio.Semaphore, # 文档级别信号量
) -> None:
"""Process single document"""
async with semaphore: # 🔥 文档级别并发控制
# ... 处理单个文档的所有chunks
# 创建文档级别信号量
semaphore = asyncio.Semaphore(self.max_parallel_insert) # 默认2
# 为每个文档创建处理任务
doc_tasks = []
for doc_id, status_doc in to_process_docs.items():
doc_tasks.append(
process_document(
doc_id, status_doc, split_by_character, split_by_character_only,
pipeline_status, pipeline_status_lock, semaphore
)
)
# 等待所有文档处理完成
await asyncio.gather(*doc_tasks)
```
## 2. Chunk级别并发控制
**控制参数**`llm_model_max_async`
**关键点**每个文档都会独立创建自己的chunk信号量
```python
# lightrag/lightrag.py
llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4)))
```
### 实现机制
`extract_entities` 函数中,**每个文档独立创建**自己的chunk信号量
```python
# lightrag/operate.py - extract_entities函数
async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...):
# 🔥 关键:每个文档都会独立创建这个信号量!
llm_model_max_async = global_config.get("llm_model_max_async", 4)
semaphore = asyncio.Semaphore(llm_model_max_async) # 每个文档的chunk信号量
async def _process_with_semaphore(chunk):
async with semaphore: # 🔥 文档内部的chunk并发控制
return await _process_single_content(chunk)
# 为每个chunk创建任务
tasks = []
for c in ordered_chunks:
task = asyncio.create_task(_process_with_semaphore(c))
tasks.append(task)
# 等待所有chunk处理完成
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
chunk_results = [task.result() for task in tasks]
return chunk_results
```
### 重要推论系统整体Chunk并发数
由于每个文档独立创建chunk信号量系统理论上的chunk并发数是
**理论Chunk并发数 = max_parallel_insert × llm_model_max_async**
例如:
- `max_parallel_insert = 2`同时处理2个文档
- `llm_model_max_async = 4`每个文档最多4个chunk并发
- 理论结果:最多 2 × 4 = 8个chunk同时处于"处理中"状态
## 3. LLM请求级别并发控制真正的瓶颈
**控制参数**`llm_model_max_async`(全局共享)
**关键**尽管可能有8个chunk在"处理中"但所有LLM请求共享同一个全局优先级队列
```python
# lightrag/lightrag.py - __post_init__方法
self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)(
partial(
self.llm_model_func,
hashing_kv=hashing_kv,
**self.llm_model_kwargs,
)
)
# 🔥 全局LLM队列大小 = llm_model_max_async = 4
```
### 优先级队列实现
```python
# lightrag/utils.py - priority_limit_async_func_call函数
def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000):
def final_decro(func):
queue = asyncio.PriorityQueue(maxsize=max_queue_size)
tasks = set()
async def worker():
"""Worker that processes tasks in the priority queue"""
while not shutdown_event.is_set():
try:
priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0)
result = await func(*args, **kwargs) # 🔥 实际LLM调用
if not future.done():
future.set_result(result)
except Exception as e:
# 错误处理...
finally:
queue.task_done()
# 🔥 创建固定数量的workermax_size个这是真正的并发限制
for _ in range(max_size):
task = asyncio.create_task(worker())
tasks.add(task)
```
## 4. Chunk内部处理机制串行
### 为什么是串行?
每个chunk内部的处理严格按照以下顺序串行执行
```python
# lightrag/operate.py - _process_single_content函数
async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]):
# 步骤1初始实体提取
hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content})
final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...)
# 处理初始提取结果
maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path)
# 步骤2Gleaning深挖阶段
for now_glean_index in range(entity_extract_max_gleaning):
# 🔥 串行等待gleaning结果
glean_result = await use_llm_func_with_cache(
continue_prompt, use_llm_func,
llm_response_cache=llm_response_cache,
history_messages=history, cache_type="extract"
)
# 处理gleaning结果
glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path)
# 合并结果...
# 步骤3判断是否继续循环
if now_glean_index == entity_extract_max_gleaning - 1:
break
# 🔥 串行等待循环判断结果
if_loop_result = await use_llm_func_with_cache(
if_loop_prompt, use_llm_func,
llm_response_cache=llm_response_cache,
history_messages=history, cache_type="extract"
)
if if_loop_result.strip().strip('"').strip("'").lower() != "yes":
break
return maybe_nodes, maybe_edges
```
## 5. 完整的并发层次图
![lightrag_indexing.png](..%2Fassets%2Flightrag_indexing.png)
## 6. 实际运行场景分析
### 场景1单文档多Chunk
假设有1个文档包含6个chunks
- 文档级别只有1个文档不受 `max_parallel_insert` 限制
- Chunk级别最多4个chunks同时处理`llm_model_max_async=4` 限制)
- LLM级别全局最多4个LLM请求并发
**预期行为**4个chunks并发处理剩余2个chunks等待。
### 场景2多文档多Chunk
假设有3个文档每个文档包含10个chunks
- 文档级别最多2个文档同时处理
- Chunk级别每个文档最多4个chunks同时处理
- 理论Chunk并发2 × 4 = 8个chunks同时处理
- 实际LLM并发只有4个LLM请求真正执行
**实际状态分布**
```
# 可能的系统状态:
文档1: 4个chunks"处理中"其中2个在执行LLM2个在等待LLM响应
文档2: 4个chunks"处理中"其中2个在执行LLM2个在等待LLM响应
文档3: 等待文档级别信号量
总计:
- 8个chunks处于"处理中"状态
- 4个LLM请求真正执行
- 4个chunks等待LLM响应
```
### 场景3资源瓶颈分析
[LightRAG 多文档并发处理机制详解.md](LightRAG%20%E5%A4%9A%E6%96%87%E6%A1%A3%E5%B9%B6%E5%8F%91%E5%A4%84%E7%90%86%E6%9C%BA%E5%88%B6%E8%AF%A6%E8%A7%A3.md)
## 7. 性能优化建议
### 理解瓶颈
**真正的瓶颈是全局LLM队列而不是chunk信号量**
### 调整策略
**策略1提高LLM并发能力**
```bash
# 环境变量配置
export MAX_PARALLEL_INSERT=2 # 保持文档并发
export MAX_ASYNC=8 # 🔥 增加LLM请求并发数
```
**策略2平衡文档和LLM并发**
```python
rag = LightRAG(
max_parallel_insert=3, # 适度增加文档并发
llm_model_max_async=12, # 大幅增加LLM并发
entity_extract_max_gleaning=0, # 减少chunk内串行步骤
)
```
## 8. 总结
LightRAG的多文档并发处理机制的关键特点
### 并发层次
1. **文档间争抢**:受 `max_parallel_insert` 控制默认2个文档并发
2. **理论Chunk并发**:每个文档独立创建信号量,总数 = `max_parallel_insert × llm_model_max_async`
3. **实际LLM并发**所有chunk共享全局LLM队列`llm_model_max_async` 控制
4. **单Chunk内串行**每个chunk内的多个LLM请求严格串行执行
### 关键洞察
- **理论vs实际**系统可能有很多chunk在"处理中"但只有少数在真正执行LLM请求
- **真正瓶颈**全局LLM请求队列是性能瓶颈而不是chunk信号量
- **优化重点**:提高 `llm_model_max_async` 比增加 `max_parallel_insert` 更有效