diff --git a/docs/LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis.md b/docs/LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis.md new file mode 100644 index 00000000..de3ef881 --- /dev/null +++ b/docs/LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis.md @@ -0,0 +1,286 @@ +# LightRAG Multi-Document Processing: Concurrent Control Strategy Analysis + +LightRAG employs a multi-layered concurrent control strategy when processing multiple documents. This article provides an in-depth analysis of the concurrent control mechanisms at document level, chunk level, and LLM request level, helping you understand why specific concurrent behaviors occur. + +## Overview + +LightRAG's concurrent control is divided into three layers: + +1. **Document-level concurrency**: Controls the number of documents processed simultaneously +2. **Chunk-level concurrency**: Controls the number of chunks processed simultaneously within a single document +3. **LLM request-level concurrency**: Controls the global concurrent number of LLM requests + +## 1. Document-Level Concurrent Control + +**Control Parameter**: `max_parallel_insert` + +Document-level concurrency is controlled by the `max_parallel_insert` parameter, with a default value of 2. + +```python +# lightrag/lightrag.py +max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2))) +``` + +### Implementation Mechanism + +In the `apipeline_process_enqueue_documents` method, a semaphore is used to control document concurrency: + +```python +# lightrag/lightrag.py - apipeline_process_enqueue_documents method +async def process_document( + doc_id: str, + status_doc: DocProcessingStatus, + split_by_character: str | None, + split_by_character_only: bool, + pipeline_status: dict, + pipeline_status_lock: asyncio.Lock, + semaphore: asyncio.Semaphore, # Document-level semaphore +) -> None: + """Process single document""" + async with semaphore: # 🔥 Document-level concurrent control + # ... Process all chunks of a single document + +# Create document-level semaphore +semaphore = asyncio.Semaphore(self.max_parallel_insert) # Default 2 + +# Create processing tasks for each document +doc_tasks = [] +for doc_id, status_doc in to_process_docs.items(): + doc_tasks.append( + process_document( + doc_id, status_doc, split_by_character, split_by_character_only, + pipeline_status, pipeline_status_lock, semaphore + ) + ) + +# Wait for all documents to complete processing +await asyncio.gather(*doc_tasks) +``` + +## 2. Chunk-Level Concurrent Control + +**Control Parameter**: `llm_model_max_async` + +**Key Point**: Each document independently creates its own chunk semaphore! + +```python +# lightrag/lightrag.py +llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4))) +``` + +### Implementation Mechanism + +In the `extract_entities` function, **each document independently creates** its own chunk semaphore: + +```python +# lightrag/operate.py - extract_entities function +async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...): + # 🔥 Key: Each document independently creates this semaphore! + llm_model_max_async = global_config.get("llm_model_max_async", 4) + semaphore = asyncio.Semaphore(llm_model_max_async) # Chunk semaphore for each document + + async def _process_with_semaphore(chunk): + async with semaphore: # 🔥 Chunk concurrent control within document + return await _process_single_content(chunk) + + # Create tasks for each chunk + tasks = [] + for c in ordered_chunks: + task = asyncio.create_task(_process_with_semaphore(c)) + tasks.append(task) + + # Wait for all chunks to complete processing + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + chunk_results = [task.result() for task in tasks] + return chunk_results +``` + +### Important Inference: System Overall Chunk Concurrency + +Since each document independently creates chunk semaphores, the theoretical chunk concurrency of the system is: + +**Theoretical Chunk Concurrency = max_parallel_insert × llm_model_max_async** + +For example: +- `max_parallel_insert = 2` (process 2 documents simultaneously) +- `llm_model_max_async = 4` (maximum 4 chunk concurrency per document) +- **Theoretical result**: Maximum 2 × 4 = 8 chunks simultaneously in "processing" state + +## 3. LLM Request-Level Concurrent Control (The Real Bottleneck) + +**Control Parameter**: `llm_model_max_async` (globally shared) + +**Key**: Although there might be 8 chunks "in processing", all LLM requests share the same global priority queue! + +```python +# lightrag/lightrag.py - __post_init__ method +self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)( + partial( + self.llm_model_func, + hashing_kv=hashing_kv, + **self.llm_model_kwargs, + ) +) +# 🔥 Global LLM queue size = llm_model_max_async = 4 +``` + +### Priority Queue Implementation + +```python +# lightrag/utils.py - priority_limit_async_func_call function +def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): + def final_decro(func): + queue = asyncio.PriorityQueue(maxsize=max_queue_size) + tasks = set() + + async def worker(): + """Worker that processes tasks in the priority queue""" + while not shutdown_event.is_set(): + try: + priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0) + result = await func(*args, **kwargs) # 🔥 Actual LLM call + if not future.done(): + future.set_result(result) + except Exception as e: + # Error handling... + finally: + queue.task_done() + + # 🔥 Create fixed number of workers (max_size), this is the real concurrency limit + for _ in range(max_size): + task = asyncio.create_task(worker()) + tasks.add(task) +``` + +## 4. Chunk Internal Processing Mechanism (Serial) + +### Why Serial? + +Internal processing of each chunk strictly follows this serial execution order: + +```python +# lightrag/operate.py - _process_single_content function +async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]): + # Step 1: Initial entity extraction + hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content}) + final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...) + + # Process initial extraction results + maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path) + + # Step 2: Gleaning phase + for now_glean_index in range(entity_extract_max_gleaning): + # 🔥 Serial wait for gleaning results + glean_result = await use_llm_func_with_cache( + continue_prompt, use_llm_func, + llm_response_cache=llm_response_cache, + history_messages=history, cache_type="extract" + ) + + # Process gleaning results + glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path) + + # Merge results... + + # Step 3: Determine whether to continue loop + if now_glean_index == entity_extract_max_gleaning - 1: + break + + # 🔥 Serial wait for loop decision results + if_loop_result = await use_llm_func_with_cache( + if_loop_prompt, use_llm_func, + llm_response_cache=llm_response_cache, + history_messages=history, cache_type="extract" + ) + + if if_loop_result.strip().strip('"').strip("'").lower() != "yes": + break + + return maybe_nodes, maybe_edges +``` + +## 5. Complete Concurrent Hierarchy Diagram +![lightrag_indexing.png](assets%2Flightrag_indexing.png) + +### Chunk Internal Processing (Serial) +``` +Initial Extraction → Gleaning → Loop Decision → Complete +``` + +## 6. Real-World Scenario Analysis + +### Scenario 1: Single Document with Multiple Chunks +Assume 1 document with 6 chunks: + +- **Document level**: Only 1 document, not limited by `max_parallel_insert` +- **Chunk level**: Maximum 4 chunks processed simultaneously (limited by `llm_model_max_async=4`) +- **LLM level**: Global maximum 4 LLM requests concurrent + +**Expected behavior**: 4 chunks process concurrently, remaining 2 chunks wait. + +### Scenario 2: Multiple Documents with Multiple Chunks +Assume 3 documents, each with 10 chunks: + +- **Document level**: Maximum 2 documents processed simultaneously +- **Chunk level**: Maximum 4 chunks per document processed simultaneously +- **Theoretical Chunk concurrency**: 2 × 4 = 8 chunks processed simultaneously +- **Actual LLM concurrency**: Only 4 LLM requests actually execute + +**Actual state distribution**: +``` +# Possible system state: +Document 1: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response) +Document 2: 4 chunks "processing" (2 executing LLM, 2 waiting for LLM response) +Document 3: Waiting for document-level semaphore + +Total: +- 8 chunks in "processing" state +- 4 LLM requests actually executing +- 4 chunks waiting for LLM response +``` + +### Scenario 3: Resource Bottleneck Analysis + +**Timeline Example (max_parallel_insert=2, llm_model_max_async=4)**: +![lightrag_llm_request_timeline.png](assets%2Flightrag_llm_request_timeline.png) + +## 7. Performance Optimization Recommendations + +### Understanding the Bottleneck + +The real bottleneck is the global LLM queue, not the chunk semaphores! + +### Adjustment Strategies + +**Strategy 1: Increase LLM Concurrent Capacity** + +```bash +# Environment variable configuration +export MAX_PARALLEL_INSERT=2 # Keep document concurrency +export MAX_ASYNC=8 # 🔥 Increase LLM request concurrency +``` + +**Strategy 2: Balance Document and LLM Concurrency** + +```python +rag = LightRAG( + max_parallel_insert=3, # Moderately increase document concurrency + llm_model_max_async=12, # Significantly increase LLM concurrency + entity_extract_max_gleaning=0, # Reduce serial steps within chunks +) +``` + +## 8. Summary + +Key characteristics of LightRAG's multi-document concurrent processing mechanism: + +### Concurrent Layers +1. **Inter-document competition**: Controlled by `max_parallel_insert`, default 2 documents concurrent +2. **Theoretical Chunk concurrency**: Each document independently creates semaphores, total = max_parallel_insert × llm_model_max_async +3. **Actual LLM concurrency**: All chunks share global LLM queue, controlled by `llm_model_max_async` +4. **Intra-chunk serial**: Multiple LLM requests within each chunk execute strictly serially + +### Key Insights +- **Theoretical vs Actual**: System may have many chunks "in processing", but only few are actually executing LLM requests +- **Real Bottleneck**: Global LLM request queue is the performance bottleneck, not chunk semaphores +- **Optimization Focus**: Increasing `llm_model_max_async` is more effective than increasing `max_parallel_insert` diff --git a/docs/assets/lightrag_llm_request_timeline.png b/docs/assets/lightrag_llm_request_timeline.png new file mode 100644 index 00000000..a494328b Binary files /dev/null and b/docs/assets/lightrag_llm_request_timeline.png differ diff --git a/docs/zh/LightRAG 多文档并发处理机制详解.md b/docs/zh/LightRAG 多文档并发处理机制详解.md new file mode 100644 index 00000000..bdbb3c30 --- /dev/null +++ b/docs/zh/LightRAG 多文档并发处理机制详解.md @@ -0,0 +1,280 @@ +# LightRAG 多文档并发控制机制详解 + +LightRAG 在处理多个文档时采用了多层次的并发控制策略。本文将深入分析文档级别、chunk级别和LLM请求级别的并发控制机制,帮助您理解为什么会出现特定的并发行为。 + +## 概述 + +LightRAG 的并发控制分为三个层次: + +1. 文档级别并发:控制同时处理的文档数量 +2. Chunk级别并发:控制单个文档内同时处理的chunk数量 +3. LLM请求级别并发:控制全局LLM请求的并发数量 + +## 1. 文档级别并发控制 + +**控制参数**:`max_parallel_insert` + +文档级别的并发由 `max_parallel_insert` 参数控制,默认值为2。 + +```python +# lightrag/lightrag.py +max_parallel_insert: int = field(default=int(os.getenv("MAX_PARALLEL_INSERT", 2))) +``` + +### 实现机制 + +在 `apipeline_process_enqueue_documents` 方法中,使用信号量控制文档并发: + +```python +# lightrag/lightrag.py - apipeline_process_enqueue_documents方法 +async def process_document( + doc_id: str, + status_doc: DocProcessingStatus, + split_by_character: str | None, + split_by_character_only: bool, + pipeline_status: dict, + pipeline_status_lock: asyncio.Lock, + semaphore: asyncio.Semaphore, # 文档级别信号量 +) -> None: + """Process single document""" + async with semaphore: # 🔥 文档级别并发控制 + # ... 处理单个文档的所有chunks + +# 创建文档级别信号量 +semaphore = asyncio.Semaphore(self.max_parallel_insert) # 默认2 + +# 为每个文档创建处理任务 +doc_tasks = [] +for doc_id, status_doc in to_process_docs.items(): + doc_tasks.append( + process_document( + doc_id, status_doc, split_by_character, split_by_character_only, + pipeline_status, pipeline_status_lock, semaphore + ) + ) + +# 等待所有文档处理完成 +await asyncio.gather(*doc_tasks) +``` + +## 2. Chunk级别并发控制 + +**控制参数**:`llm_model_max_async` + +**关键点**:每个文档都会独立创建自己的chunk信号量! + +```python +# lightrag/lightrag.py +llm_model_max_async: int = field(default=int(os.getenv("MAX_ASYNC", 4))) +``` + +### 实现机制 + +在 `extract_entities` 函数中,**每个文档独立创建**自己的chunk信号量: + +```python +# lightrag/operate.py - extract_entities函数 +async def extract_entities(chunks: dict[str, TextChunkSchema], global_config: dict[str, str], ...): + # 🔥 关键:每个文档都会独立创建这个信号量! + llm_model_max_async = global_config.get("llm_model_max_async", 4) + semaphore = asyncio.Semaphore(llm_model_max_async) # 每个文档的chunk信号量 + + async def _process_with_semaphore(chunk): + async with semaphore: # 🔥 文档内部的chunk并发控制 + return await _process_single_content(chunk) + + # 为每个chunk创建任务 + tasks = [] + for c in ordered_chunks: + task = asyncio.create_task(_process_with_semaphore(c)) + tasks.append(task) + + # 等待所有chunk处理完成 + done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) + chunk_results = [task.result() for task in tasks] + return chunk_results +``` + +### 重要推论:系统整体Chunk并发数 + +由于每个文档独立创建chunk信号量,系统理论上的chunk并发数是: + +**理论Chunk并发数 = max_parallel_insert × llm_model_max_async** + +例如: +- `max_parallel_insert = 2`(同时处理2个文档) +- `llm_model_max_async = 4`(每个文档最多4个chunk并发) +- 理论结果:最多 2 × 4 = 8个chunk同时处于"处理中"状态 + +## 3. LLM请求级别并发控制(真正的瓶颈) + +**控制参数**:`llm_model_max_async`(全局共享) + +**关键**:尽管可能有8个chunk在"处理中",但所有LLM请求共享同一个全局优先级队列! + +```python +# lightrag/lightrag.py - __post_init__方法 +self.llm_model_func = priority_limit_async_func_call(self.llm_model_max_async)( + partial( + self.llm_model_func, + hashing_kv=hashing_kv, + **self.llm_model_kwargs, + ) +) +# 🔥 全局LLM队列大小 = llm_model_max_async = 4 +``` + +### 优先级队列实现 + +```python +# lightrag/utils.py - priority_limit_async_func_call函数 +def priority_limit_async_func_call(max_size: int, max_queue_size: int = 1000): + def final_decro(func): + queue = asyncio.PriorityQueue(maxsize=max_queue_size) + tasks = set() + + async def worker(): + """Worker that processes tasks in the priority queue""" + while not shutdown_event.is_set(): + try: + priority, count, future, args, kwargs = await asyncio.wait_for(queue.get(), timeout=1.0) + result = await func(*args, **kwargs) # 🔥 实际LLM调用 + if not future.done(): + future.set_result(result) + except Exception as e: + # 错误处理... + finally: + queue.task_done() + + # 🔥 创建固定数量的worker(max_size个),这是真正的并发限制 + for _ in range(max_size): + task = asyncio.create_task(worker()) + tasks.add(task) +``` + +## 4. Chunk内部处理机制(串行) + +### 为什么是串行? + +每个chunk内部的处理严格按照以下顺序串行执行: + +```python +# lightrag/operate.py - _process_single_content函数 +async def _process_single_content(chunk_key_dp: tuple[str, TextChunkSchema]): + # 步骤1:初始实体提取 + hint_prompt = entity_extract_prompt.format(**{**context_base, "input_text": content}) + final_result = await use_llm_func_with_cache(hint_prompt, use_llm_func, ...) + + # 处理初始提取结果 + maybe_nodes, maybe_edges = await _process_extraction_result(final_result, chunk_key, file_path) + + # 步骤2:Gleaning(深挖)阶段 + for now_glean_index in range(entity_extract_max_gleaning): + # 🔥 串行等待gleaning结果 + glean_result = await use_llm_func_with_cache( + continue_prompt, use_llm_func, + llm_response_cache=llm_response_cache, + history_messages=history, cache_type="extract" + ) + + # 处理gleaning结果 + glean_nodes, glean_edges = await _process_extraction_result(glean_result, chunk_key, file_path) + + # 合并结果... + + # 步骤3:判断是否继续循环 + if now_glean_index == entity_extract_max_gleaning - 1: + break + + # 🔥 串行等待循环判断结果 + if_loop_result = await use_llm_func_with_cache( + if_loop_prompt, use_llm_func, + llm_response_cache=llm_response_cache, + history_messages=history, cache_type="extract" + ) + + if if_loop_result.strip().strip('"').strip("'").lower() != "yes": + break + + return maybe_nodes, maybe_edges +``` + +## 5. 完整的并发层次图 +![lightrag_indexing.png](..%2Fassets%2Flightrag_indexing.png) + + +## 6. 实际运行场景分析 + +### 场景1:单文档多Chunk +假设有1个文档,包含6个chunks: + +- 文档级别:只有1个文档,不受 `max_parallel_insert` 限制 +- Chunk级别:最多4个chunks同时处理(受 `llm_model_max_async=4` 限制) +- LLM级别:全局最多4个LLM请求并发 + +**预期行为**:4个chunks并发处理,剩余2个chunks等待。 + +### 场景2:多文档多Chunk +假设有3个文档,每个文档包含10个chunks: + +- 文档级别:最多2个文档同时处理 +- Chunk级别:每个文档最多4个chunks同时处理 +- 理论Chunk并发:2 × 4 = 8个chunks同时处理 +- 实际LLM并发:只有4个LLM请求真正执行 + +**实际状态分布**: +``` +# 可能的系统状态: +文档1: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应) +文档2: 4个chunks"处理中"(其中2个在执行LLM,2个在等待LLM响应) +文档3: 等待文档级别信号量 + +总计: +- 8个chunks处于"处理中"状态 +- 4个LLM请求真正执行 +- 4个chunks等待LLM响应 +``` + +### 场景3:资源瓶颈分析 +[LightRAG 多文档并发处理机制详解.md](LightRAG%20%E5%A4%9A%E6%96%87%E6%A1%A3%E5%B9%B6%E5%8F%91%E5%A4%84%E7%90%86%E6%9C%BA%E5%88%B6%E8%AF%A6%E8%A7%A3.md) + +## 7. 性能优化建议 + +### 理解瓶颈 + +**真正的瓶颈是全局LLM队列,而不是chunk信号量!** + +### 调整策略 + +**策略1:提高LLM并发能力** + +```bash +# 环境变量配置 +export MAX_PARALLEL_INSERT=2 # 保持文档并发 +export MAX_ASYNC=8 # 🔥 增加LLM请求并发数 +``` + +**策略2:平衡文档和LLM并发** + +```python +rag = LightRAG( + max_parallel_insert=3, # 适度增加文档并发 + llm_model_max_async=12, # 大幅增加LLM并发 + entity_extract_max_gleaning=0, # 减少chunk内串行步骤 +) +``` + +## 8. 总结 + +LightRAG的多文档并发处理机制的关键特点: + +### 并发层次 +1. **文档间争抢**:受 `max_parallel_insert` 控制,默认2个文档并发 +2. **理论Chunk并发**:每个文档独立创建信号量,总数 = `max_parallel_insert × llm_model_max_async` +3. **实际LLM并发**:所有chunk共享全局LLM队列,受 `llm_model_max_async` 控制 +4. **单Chunk内串行**:每个chunk内的多个LLM请求严格串行执行 + +### 关键洞察 +- **理论vs实际**:系统可能有很多chunk在"处理中",但只有少数在真正执行LLM请求 +- **真正瓶颈**:全局LLM请求队列是性能瓶颈,而不是chunk信号量 +- **优化重点**:提高 `llm_model_max_async` 比增加 `max_parallel_insert` 更有效