Remove manual initialize_pipeline_status() calls across codebase

- Auto-init pipeline status in storages
- Remove redundant import statements
- Simplify initialization pattern
- Update docs and examples

(cherry picked from commit cdd53ee875)
This commit is contained in:
yangdx 2025-11-17 07:28:41 +08:00 committed by Raphaël MANSUY
parent 1e7bd654d8
commit 87561f8b28
21 changed files with 167 additions and 110 deletions

View file

@ -221,6 +221,10 @@ python examples/lightrag_openai_demo.py
> ⚠️ **如果您希望将LightRAG集成到您的项目中建议您使用LightRAG Server提供的REST API**。LightRAG Core通常用于嵌入式应用或供希望进行研究与评估的学者使用。
### ⚠️ 重要:初始化要求
LightRAG 在使用前需要显式初始化。 创建 LightRAG 实例后,您必须调用 await rag.initialize_storages(),否则将出现错误。
### 一个简单程序
以下Python代码片段演示了如何初始化LightRAG、插入文本并进行查询
@ -230,7 +234,6 @@ import os
import asyncio
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete, gpt_4o_complete, openai_embed
from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag.utils import setup_logger
setup_logger("lightrag", level="INFO")
@ -245,9 +248,7 @@ async def initialize_rag():
embedding_func=openai_embed,
llm_model_func=gpt_4o_mini_complete,
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag
await rag.initialize_storages() return rag
async def main():
try:
@ -441,8 +442,6 @@ async def initialize_rag():
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag
```
@ -571,7 +570,6 @@ from lightrag import LightRAG
from lightrag.llm.llama_index_impl import llama_index_complete_if_cache, llama_index_embed
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag.utils import setup_logger
# 为LightRAG设置日志处理程序
@ -588,8 +586,6 @@ async def initialize_rag():
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag
def main():
@ -839,8 +835,6 @@ async def initialize_rag():
# 初始化数据库连接
await rag.initialize_storages()
# 初始化文档处理的管道状态
await initialize_pipeline_status()
return rag
```

View file

@ -223,10 +223,7 @@ For a streaming response implementation example, please see `examples/lightrag_o
### ⚠️ Important: Initialization Requirements
**LightRAG requires explicit initialization before use.** You must call both `await rag.initialize_storages()` and `await initialize_pipeline_status()` after creating a LightRAG instance, otherwise you will encounter errors like:
- `AttributeError: __aenter__` - if storages are not initialized
- `KeyError: 'history_messages'` - if pipeline status is not initialized
**LightRAG requires explicit initialization before use.** You must call `await rag.initialize_storages()` after creating a LightRAG instance, otherwise you will encounter errors.
### A Simple Program
@ -237,7 +234,6 @@ import os
import asyncio
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete, gpt_4o_complete, openai_embed
from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag.utils import setup_logger
setup_logger("lightrag", level="INFO")
@ -253,9 +249,7 @@ async def initialize_rag():
llm_model_func=gpt_4o_mini_complete,
)
# IMPORTANT: Both initialization calls are required!
await rag.initialize_storages() # Initialize storage backends
await initialize_pipeline_status() # Initialize processing pipeline
return rag
await rag.initialize_storages() # Initialize storage backends return rag
async def main():
try:
@ -444,8 +438,6 @@ async def initialize_rag():
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag
```
@ -576,7 +568,6 @@ from lightrag import LightRAG
from lightrag.llm.llama_index_impl import llama_index_complete_if_cache, llama_index_embed
from llama_index.embeddings.openai import OpenAIEmbedding
from llama_index.llms.openai import OpenAI
from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag.utils import setup_logger
# Setup log handler for LightRAG
@ -593,8 +584,6 @@ async def initialize_rag():
)
await rag.initialize_storages()
await initialize_pipeline_status()
return rag
def main():
@ -846,8 +835,6 @@ async def initialize_rag():
# Initialize database connections
await rag.initialize_storages()
# Initialize pipeline status for document processing
await initialize_pipeline_status()
return rag
```
@ -932,8 +919,6 @@ async def initialize_rag():
# Initialize database connections
await rag.initialize_storages()
# Initialize pipeline status for document processing
await initialize_pipeline_status()
return rag
```
@ -1541,16 +1526,13 @@ If you encounter these errors when using LightRAG:
2. **`KeyError: 'history_messages'`**
- **Cause**: Pipeline status not initialized
- **Solution**: Call `await initialize_pipeline_status()` after initializing storages
- **Solution**: Call `
3. **Both errors in sequence**
- **Cause**: Neither initialization method was called
- **Solution**: Always follow this pattern:
```python
rag = LightRAG(...)
await rag.initialize_storages()
await initialize_pipeline_status()
```
await rag.initialize_storages() ```
### Model Switching Issues

View file

@ -6,7 +6,6 @@ import numpy as np
from dotenv import load_dotenv
import logging
from openai import AzureOpenAI
from lightrag.kg.shared_storage import initialize_pipeline_status
logging.basicConfig(level=logging.INFO)
@ -93,9 +92,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -6,7 +6,6 @@ import logging.config
from lightrag import LightRAG, QueryParam
from lightrag.llm.ollama import ollama_model_complete, ollama_embed
from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug
from lightrag.kg.shared_storage import initialize_pipeline_status
from dotenv import load_dotenv
@ -104,9 +103,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -7,7 +7,6 @@ from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import openai_complete_if_cache
from lightrag.llm.ollama import ollama_embed
from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug
from lightrag.kg.shared_storage import initialize_pipeline_status
from dotenv import load_dotenv
@ -120,9 +119,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -4,7 +4,6 @@ import logging
import logging.config
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed
from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag.utils import logger, set_verbose_debug
WORKING_DIR = "./dickens"
@ -84,8 +83,7 @@ async def initialize_rag():
llm_model_func=gpt_4o_mini_complete,
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -4,7 +4,6 @@ from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import gpt_4o_mini_complete, openai_embed
from lightrag.utils import EmbeddingFunc
import numpy as np
from lightrag.kg.shared_storage import initialize_pipeline_status
#########
# Uncomment the below two lines if running in a jupyter notebook to handle the async nature of rag.insert()
@ -61,9 +60,7 @@ async def initialize_rag():
log_level="DEBUG",
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -7,7 +7,6 @@ This example demonstrates how to use LightRAG's modal processors directly withou
import asyncio
import argparse
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.kg.shared_storage import initialize_pipeline_status
from lightrag import LightRAG
from lightrag.utils import EmbeddingFunc
from raganything.modalprocessors import (
@ -190,9 +189,7 @@ async def initialize_rag(api_key: str, base_url: str = None):
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -29,7 +29,6 @@ import numpy as np
from lightrag import LightRAG, QueryParam
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
from lightrag.utils import EmbeddingFunc, setup_logger
from lightrag.kg.shared_storage import initialize_pipeline_status
from functools import partial
from lightrag.rerank import cohere_rerank
@ -94,9 +93,7 @@ async def create_rag_with_rerank():
rerank_model_func=rerank_model_func,
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -8,7 +8,6 @@ import logging
from lightrag import LightRAG, QueryParam
from lightrag.llm.bedrock import bedrock_complete, bedrock_embed
from lightrag.utils import EmbeddingFunc
from lightrag.kg.shared_storage import initialize_pipeline_status
import asyncio
import nest_asyncio
@ -32,9 +31,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -5,7 +5,6 @@ import logging
import logging.config
from lightrag import LightRAG, QueryParam
from lightrag.utils import EmbeddingFunc, logger, set_verbose_debug
from lightrag.kg.shared_storage import initialize_pipeline_status
import requests
import numpy as np
@ -221,9 +220,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -4,7 +4,6 @@ from lightrag import LightRAG, QueryParam
from lightrag.llm.hf import hf_model_complete, hf_embed
from lightrag.utils import EmbeddingFunc
from transformers import AutoModel, AutoTokenizer
from lightrag.kg.shared_storage import initialize_pipeline_status
import asyncio
import nest_asyncio
@ -37,9 +36,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -12,7 +12,6 @@ import nest_asyncio
nest_asyncio.apply()
from lightrag.kg.shared_storage import initialize_pipeline_status
# Configure working directory
WORKING_DIR = "./index_default"
@ -94,9 +93,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -12,7 +12,6 @@ import nest_asyncio
nest_asyncio.apply()
from lightrag.kg.shared_storage import initialize_pipeline_status
# Configure working directory
WORKING_DIR = "./index_default"
@ -96,9 +95,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -12,7 +12,6 @@ import nest_asyncio
nest_asyncio.apply()
from lightrag.kg.shared_storage import initialize_pipeline_status
# Configure working directory
WORKING_DIR = "./index_default"
@ -107,9 +106,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -5,7 +5,6 @@ from lightrag.llm.lmdeploy import lmdeploy_model_if_cache
from lightrag.llm.hf import hf_embed
from lightrag.utils import EmbeddingFunc
from transformers import AutoModel, AutoTokenizer
from lightrag.kg.shared_storage import initialize_pipeline_status
import asyncio
import nest_asyncio
@ -62,9 +61,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -9,7 +9,6 @@ from lightrag.llm import (
)
from lightrag.utils import EmbeddingFunc
import numpy as np
from lightrag.kg.shared_storage import initialize_pipeline_status
# for custom llm_model_func
from lightrag.utils import locate_json_string_body_from_string
@ -115,9 +114,7 @@ async def initialize_rag():
),
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -3,7 +3,6 @@ import asyncio
from lightrag import LightRAG, QueryParam
from lightrag.llm.ollama import ollama_embed, openai_complete_if_cache
from lightrag.utils import EmbeddingFunc
from lightrag.kg.shared_storage import initialize_pipeline_status
# WorkingDir
ROOT_DIR = os.path.dirname(os.path.abspath(__file__))
@ -66,9 +65,7 @@ async def initialize_rag():
doc_status_storage="RedisKVStorage",
)
await rag.initialize_storages()
await initialize_pipeline_status()
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag

View file

@ -3,17 +3,10 @@
Diagnostic tool to check LightRAG initialization status.
This tool helps developers verify that their LightRAG instance is properly
initialized and ready to use. It should be called AFTER initialize_storages()
to validate that all components are correctly set up.
initialized before use, preventing common initialization errors.
Usage:
# Basic usage in your code:
rag = LightRAG(...)
await rag.initialize_storages()
await check_lightrag_setup(rag, verbose=True)
# Run demo from command line:
python -m lightrag.tools.check_initialization --demo
python -m lightrag.tools.check_initialization
"""
import asyncio
@ -89,11 +82,11 @@ async def check_lightrag_setup(rag_instance: LightRAG, verbose: bool = False) ->
try:
from lightrag.kg.shared_storage import get_namespace_data
get_namespace_data("pipeline_status", workspace=rag_instance.workspace)
get_namespace_data("pipeline_status")
print("✅ Pipeline status: INITIALIZED")
except KeyError:
issues.append(
"Pipeline status not initialized - call rag.initialize_storages() first"
"Pipeline status not initialized - call initialize_pipeline_status()"
)
except Exception as e:
issues.append(f"Error checking pipeline status: {str(e)}")
@ -108,6 +101,7 @@ async def check_lightrag_setup(rag_instance: LightRAG, verbose: bool = False) ->
print("\n📝 To fix, run this initialization sequence:\n")
print(" await rag.initialize_storages()")
print(" from lightrag.kg.shared_storage import initialize_pipeline_status")
print(
"\n📚 Documentation: https://github.com/HKUDS/LightRAG#important-initialization-requirements"
)
@ -144,10 +138,13 @@ async def demo():
llm_model_func=gpt_4o_mini_complete,
)
print("\n🔄 Initializing storages...\n")
await rag.initialize_storages() # Auto-initializes pipeline_status
print("\n🔴 BEFORE initialization:\n")
await check_lightrag_setup(rag, verbose=True)
print("\n🔍 Checking initialization status:\n")
print("\n" + "=" * 50)
print("\n🔄 Initializing...\n")
await rag.initialize_storages() # Auto-initializes pipeline_status
print("\n🟢 AFTER initialization:\n")
await check_lightrag_setup(rag, verbose=True)
# Cleanup

48
reproduce/Step_1.py Normal file
View file

@ -0,0 +1,48 @@
import os
import json
import time
import asyncio
from lightrag import LightRAG
def insert_text(rag, file_path):
with open(file_path, mode="r") as f:
unique_contexts = json.load(f)
retries = 0
max_retries = 3
while retries < max_retries:
try:
rag.insert(unique_contexts)
break
except Exception as e:
retries += 1
print(f"Insertion failed, retrying ({retries}/{max_retries}), error: {e}")
time.sleep(10)
if retries == max_retries:
print("Insertion failed after exceeding the maximum number of retries")
cls = "agriculture"
WORKING_DIR = f"../{cls}"
if not os.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR)
async def initialize_rag():
rag = LightRAG(working_dir=WORKING_DIR)
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag
def main():
# Initialize RAG instance
rag = asyncio.run(initialize_rag())
insert_text(rag, f"../datasets/unique_contexts/{cls}_unique_contexts.json")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,83 @@
import os
import json
import time
import asyncio
import numpy as np
from lightrag import LightRAG
from lightrag.utils import EmbeddingFunc
from lightrag.llm.openai import openai_complete_if_cache, openai_embed
## For Upstage API
# please check if embedding_dim=4096 in lightrag.py and llm.py in lightrag direcotry
async def llm_model_func(
prompt, system_prompt=None, history_messages=[], **kwargs
) -> str:
return await openai_complete_if_cache(
"solar-mini",
prompt,
system_prompt=system_prompt,
history_messages=history_messages,
api_key=os.getenv("UPSTAGE_API_KEY"),
base_url="https://api.upstage.ai/v1/solar",
**kwargs,
)
async def embedding_func(texts: list[str]) -> np.ndarray:
return await openai_embed(
texts,
model="solar-embedding-1-large-query",
api_key=os.getenv("UPSTAGE_API_KEY"),
base_url="https://api.upstage.ai/v1/solar",
)
## /For Upstage API
def insert_text(rag, file_path):
with open(file_path, mode="r") as f:
unique_contexts = json.load(f)
retries = 0
max_retries = 3
while retries < max_retries:
try:
rag.insert(unique_contexts)
break
except Exception as e:
retries += 1
print(f"Insertion failed, retrying ({retries}/{max_retries}), error: {e}")
time.sleep(10)
if retries == max_retries:
print("Insertion failed after exceeding the maximum number of retries")
cls = "mix"
WORKING_DIR = f"../{cls}"
if not os.path.exists(WORKING_DIR):
os.mkdir(WORKING_DIR)
async def initialize_rag():
rag = LightRAG(
working_dir=WORKING_DIR,
llm_model_func=llm_model_func,
embedding_func=EmbeddingFunc(embedding_dim=4096, func=embedding_func),
)
await rag.initialize_storages() # Auto-initializes pipeline_status
return rag
def main():
# Initialize RAG instance
rag = asyncio.run(initialize_rag())
insert_text(rag, f"../datasets/unique_contexts/{cls}_unique_contexts.json")
if __name__ == "__main__":
main()