Merge branch 'main' into using_keyed_lock_for_max_concurrency

This commit is contained in:
Arjun Rao 2025-05-09 21:17:41 +10:00
commit a039257896
11 changed files with 142 additions and 69 deletions

View file

@ -532,20 +532,20 @@ response = rag.query(
)
```
### 自定义用户提示词
### 用户提示词 vs. 查询内容
自定义用户提示词不影响查询内容仅仅用于向LLM指示如何处理查询结果。以下是使用方法:
当使用LightRAG查询内容的时候不要把内容查询和与查询结果无关的输出加工写在一起。因为把两者混在一起会严重影响查询的效果。Query Param中的`user_prompt`就是为解决这一问题而设计的。`user_prompt`中的内容不参与RAG中的查询过程它仅会在获得查询结果之后与查询结果一起送给LLM指导LLM如何处理查询结果。以下是使用方法:
```python
# 创建查询参数
# Create query parameters
query_param = QueryParam(
mode = "hybrid", # 或其他模式:"local"、"global"、"hybrid"、"mix"和"naive"
user_prompt = "Please create the diagram using the Mermaid syntax"
mode = "hybrid", # Other modeslocal, global, hybrid, mix, naive
user_prompt = "如需画图使用mermaid格式节点名称用英文或拼音显示名称用中文",
)
# 查询和处理
# Query and process
response_default = rag.query(
"Please draw a character relationship diagram for Scrooge",
"请画出 Scrooge 的人物关系图谱",
param=query_param
)
print(response_default)

View file

@ -570,15 +570,15 @@ response = rag.query(
</details>
### Custom User Prompt Support
### User Prompt vs. Query
Custom user prompts do not affect the query content; they are only used to instruct the LLM on how to handle the query results. Here's how to use it:
When using LightRAG for content queries, avoid combining the search process with unrelated output processing, as this significantly impacts query effectiveness. The `user_prompt` parameter in Query Param is specifically designed to address this issue — it does not participate in the RAG retrieval phase, but rather guides the LLM on how to process the retrieved results after the query is completed. Here's how to use it:
```python
# Create query parameters
query_param = QueryParam(
mode = "hybrid", # 或其他模式:"local"、"global"、"hybrid"、"mix"和"naive"
user_prompt = "Please create the diagram using the Mermaid syntax"
mode = "hybrid", # Other modeslocal, global, hybrid, mix, naive
user_prompt = "For diagrams, use mermaid format with English/Pinyin node names and Chinese display labels",
)
# Query and process

View file

@ -20,4 +20,4 @@ user = your_username
password = your_password
database = your_database
workspace = default # 可选,默认为default
max_connections = 12
max_connections = 12

View file

@ -131,6 +131,7 @@ POSTGRES_PORT=5432
POSTGRES_USER=your_username
POSTGRES_PASSWORD='your_password'
POSTGRES_DATABASE=your_database
POSTGRES_MAX_CONNECTIONS=12
### separating all data from difference Lightrag instances(deprecating)
# POSTGRES_WORKSPACE=default

View file

@ -202,6 +202,15 @@ Open WebUI 使用 LLM 来执行会话标题和会话关键词生成任务。因
"/context" 也不是 LightRAG 查询模式,它会告诉 LightRAG 只返回为 LLM 准备的上下文信息。您可以检查上下文是否符合您的需求,或者自行处理上下文。
### 在聊天中添加用户提示词
使用LightRAG进行内容查询时应避免将搜索过程与无关的输出处理相结合这会显著影响查询效果。用户提示user prompt正是为解决这一问题而设计 -- 它不参与RAG检索阶段而是在查询完成后指导大语言模型LLM如何处理检索结果。我们可以在查询前缀末尾添加方括号从而向LLM传递用户提示词
```
/[使用mermaid格式画图] 请画出 Scrooge 的人物关系图谱
/mix[使用mermaid格式画图] 请画出 Scrooge 的人物关系图谱
```
## API 密钥和认证
默认情况下LightRAG 服务器可以在没有任何认证的情况下访问。我们可以使用 API 密钥或账户凭证配置服务器以确保其安全。

View file

@ -204,6 +204,15 @@ For example, the chat message `/mix What's LightRAG?` will trigger a mix mode qu
`/context` is also not a LightRAG query mode; it will tell LightRAG to return only the context information prepared for the LLM. You can check the context if it's what you want, or process the context by yourself.
### Add user prompt in chat
When using LightRAG for content queries, avoid combining the search process with unrelated output processing, as this significantly impacts query effectiveness. User prompt is specifically designed to address this issue — it does not participate in the RAG retrieval phase, but rather guides the LLM on how to process the retrieved results after the query is completed. We can append square brackets to the query prefix to provide the LLM with the user prompt:
```
/[Use mermaid format for diagrams] Please draw a character relationship diagram for Scrooge
/mix[Use mermaid format for diagrams] Please draw a character relationship diagram for Scrooge
```
## API Key and Authentication
By default, the LightRAG Server can be accessed without any authentication. We can configure the server with an API Key or account credentials to secure it.

View file

@ -101,10 +101,31 @@ def estimate_tokens(text: str) -> int:
return len(tokens)
def parse_query_mode(query: str) -> tuple[str, SearchMode, bool]:
def parse_query_mode(query: str) -> tuple[str, SearchMode, bool, Optional[str]]:
"""Parse query prefix to determine search mode
Returns tuple of (cleaned_query, search_mode, only_need_context)
Returns tuple of (cleaned_query, search_mode, only_need_context, user_prompt)
Examples:
- "/local[use mermaid format for diagrams] query string" -> (cleaned_query, SearchMode.local, False, "use mermaid format for diagrams")
- "/[use mermaid format for diagrams] query string" -> (cleaned_query, SearchMode.hybrid, False, "use mermaid format for diagrams")
- "/local query string" -> (cleaned_query, SearchMode.local, False, None)
"""
# Initialize user_prompt as None
user_prompt = None
# First check if there's a bracket format for user prompt
bracket_pattern = r"^/([a-z]*)\[(.*?)\](.*)"
bracket_match = re.match(bracket_pattern, query)
if bracket_match:
mode_prefix = bracket_match.group(1)
user_prompt = bracket_match.group(2)
remaining_query = bracket_match.group(3).lstrip()
# Reconstruct query, removing the bracket part
query = f"/{mode_prefix} {remaining_query}".strip()
# Unified handling of mode and only_need_context determination
mode_map = {
"/local ": (SearchMode.local, False),
"/global ": (
@ -128,11 +149,11 @@ def parse_query_mode(query: str) -> tuple[str, SearchMode, bool]:
for prefix, (mode, only_need_context) in mode_map.items():
if query.startswith(prefix):
# After removing prefix an leading spaces
# After removing prefix and leading spaces
cleaned_query = query[len(prefix) :].lstrip()
return cleaned_query, mode, only_need_context
return cleaned_query, mode, only_need_context, user_prompt
return query, SearchMode.hybrid, False
return query, SearchMode.hybrid, False, user_prompt
class OllamaAPI:
@ -362,7 +383,9 @@ class OllamaAPI:
]
# Check for query prefix
cleaned_query, mode, only_need_context = parse_query_mode(query)
cleaned_query, mode, only_need_context, user_prompt = parse_query_mode(
query
)
start_time = time.time_ns()
prompt_tokens = estimate_tokens(cleaned_query)
@ -375,6 +398,10 @@ class OllamaAPI:
"top_k": self.top_k,
}
# Add user_prompt to param_dict
if user_prompt is not None:
param_dict["user_prompt"] = user_prompt
if (
hasattr(self.rag, "args")
and self.rag.args.history_turns is not None
@ -524,7 +551,7 @@ class OllamaAPI:
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"Content-Type": "application/x-ndjson",
"X-Accel-Buffering": "no", # 确保在Nginx代理时正确处理流式响应
"X-Accel-Buffering": "no", # Ensure proper handling of streaming responses in Nginx proxy
},
)
else:

View file

@ -55,7 +55,7 @@ class PostgreSQLDB:
self.password = config.get("password", None)
self.database = config.get("database", "postgres")
self.workspace = config.get("workspace", "default")
self.max = config.get("max_connections", 12)
self.max = int(config.get("max_connections", 12))
self.increment = 1
self.pool: Pool | None = None

View file

@ -177,28 +177,32 @@ async def openai_complete_if_cache(
logger.debug("===== Sending Query to LLM =====")
try:
async with openai_async_client:
if "response_format" in kwargs:
response = await openai_async_client.beta.chat.completions.parse(
model=model, messages=messages, **kwargs
)
else:
response = await openai_async_client.chat.completions.create(
model=model, messages=messages, **kwargs
)
# Don't use async with context manager, use client directly
if "response_format" in kwargs:
response = await openai_async_client.beta.chat.completions.parse(
model=model, messages=messages, **kwargs
)
else:
response = await openai_async_client.chat.completions.create(
model=model, messages=messages, **kwargs
)
except APIConnectionError as e:
logger.error(f"OpenAI API Connection Error: {e}")
await openai_async_client.close() # Ensure client is closed
raise
except RateLimitError as e:
logger.error(f"OpenAI API Rate Limit Error: {e}")
await openai_async_client.close() # Ensure client is closed
raise
except APITimeoutError as e:
logger.error(f"OpenAI API Timeout Error: {e}")
await openai_async_client.close() # Ensure client is closed
raise
except Exception as e:
logger.error(
f"OpenAI API Call Failed,\nModel: {model},\nParams: {kwargs}, Got: {e}"
)
await openai_async_client.close() # Ensure client is closed
raise
if hasattr(response, "__aiter__"):
@ -243,6 +247,8 @@ async def openai_complete_if_cache(
logger.warning(
f"Failed to close stream response: {close_error}"
)
# Ensure client is closed in case of exception
await openai_async_client.close()
raise
finally:
# Ensure resources are released even if no exception occurs
@ -258,40 +264,50 @@ async def openai_complete_if_cache(
logger.warning(
f"Failed to close stream response in finally block: {close_error}"
)
# Note: We don't close the client here for streaming responses
# The client will be closed by the caller after streaming is complete
return inner()
else:
if (
not response
or not response.choices
or not hasattr(response.choices[0], "message")
or not hasattr(response.choices[0].message, "content")
):
logger.error("Invalid response from OpenAI API")
raise InvalidResponseError("Invalid response from OpenAI API")
try:
if (
not response
or not response.choices
or not hasattr(response.choices[0], "message")
or not hasattr(response.choices[0].message, "content")
):
logger.error("Invalid response from OpenAI API")
await openai_async_client.close() # Ensure client is closed
raise InvalidResponseError("Invalid response from OpenAI API")
content = response.choices[0].message.content
content = response.choices[0].message.content
if not content or content.strip() == "":
logger.error("Received empty content from OpenAI API")
raise InvalidResponseError("Received empty content from OpenAI API")
if not content or content.strip() == "":
logger.error("Received empty content from OpenAI API")
await openai_async_client.close() # Ensure client is closed
raise InvalidResponseError("Received empty content from OpenAI API")
if r"\u" in content:
content = safe_unicode_decode(content.encode("utf-8"))
if r"\u" in content:
content = safe_unicode_decode(content.encode("utf-8"))
if token_tracker and hasattr(response, "usage"):
token_counts = {
"prompt_tokens": getattr(response.usage, "prompt_tokens", 0),
"completion_tokens": getattr(response.usage, "completion_tokens", 0),
"total_tokens": getattr(response.usage, "total_tokens", 0),
}
token_tracker.add_usage(token_counts)
if token_tracker and hasattr(response, "usage"):
token_counts = {
"prompt_tokens": getattr(response.usage, "prompt_tokens", 0),
"completion_tokens": getattr(
response.usage, "completion_tokens", 0
),
"total_tokens": getattr(response.usage, "total_tokens", 0),
}
token_tracker.add_usage(token_counts)
logger.debug(f"Response content len: {len(content)}")
verbose_debug(f"Response: {response}")
logger.debug(f"Response content len: {len(content)}")
verbose_debug(f"Response: {response}")
return content
return content
finally:
# Ensure client is closed in all cases for non-streaming responses
await openai_async_client.close()
async def openai_complete(

View file

@ -218,7 +218,11 @@ async def _handle_single_relationship_extraction(
edge_description = clean_str(record_attributes[3])
edge_description = normalize_extracted_info(edge_description)
edge_keywords = clean_str(record_attributes[4]).strip('"').strip("'")
edge_keywords = normalize_extracted_info(
clean_str(record_attributes[4]), is_entity=True
)
edge_keywords = edge_keywords.replace("", ",")
edge_source_id = chunk_key
weight = (
float(record_attributes[-1].strip('"').strip("'"))
@ -388,14 +392,22 @@ async def _merge_edges_then_upsert(
)
)
)
keywords = GRAPH_FIELD_SEP.join(
sorted(
set(
[dp["keywords"] for dp in edges_data if dp.get("keywords")]
+ already_keywords
# Split all existing and new keywords into individual terms, then combine and deduplicate
all_keywords = set()
# Process already_keywords (which are comma-separated)
for keyword_str in already_keywords:
if keyword_str: # Skip empty strings
all_keywords.update(k.strip() for k in keyword_str.split(",") if k.strip())
# Process new keywords from edges_data
for edge in edges_data:
if edge.get("keywords"):
all_keywords.update(
k.strip() for k in edge["keywords"].split(",") if k.strip()
)
)
)
# Join all unique keywords with commas
keywords = ",".join(sorted(all_keywords))
source_id = GRAPH_FIELD_SEP.join(
set(
[dp["source_id"] for dp in edges_data if dp.get("source_id")]
@ -541,7 +553,6 @@ async def merge_nodes_and_edges(
total_relations_count = len(all_edges)
# Merge nodes and edges
# Use graph database lock to ensure atomic merges and updates
async with pipeline_status_lock:
log_message = (
f"Merging stage {current_file_number}/{total_files}: {file_path}"
@ -1309,19 +1320,19 @@ async def _build_query_context(
relations_str = json.dumps(relations_context, ensure_ascii=False)
text_units_str = json.dumps(text_units_context, ensure_ascii=False)
result = f"""-----Entities-----
result = f"""-----Entities(KG)-----
```json
{entities_str}
```
-----Relationships-----
-----Relationships(KG)-----
```json
{relations_str}
```
-----Sources-----
-----Document Chunks(DC)-----
```json
{text_units_str}

View file

@ -199,7 +199,7 @@ PROMPTS["fail_response"] = (
PROMPTS["rag_response"] = """---Role---
You are a helpful assistant responding to user query about Knowledge Base provided below.
You are a helpful assistant responding to user query about Knowledge Graph and Document Chunks provided in JSON format below.
---Goal---
@ -215,7 +215,7 @@ When handling relationships with timestamps:
---Conversation History---
{history}
---Knowledge Base---
---Knowledge Graph and Document Chunks---
{context_data}
---Response Rules---
@ -224,7 +224,7 @@ When handling relationships with timestamps:
- Use markdown formatting with appropriate section headings
- Please respond in the same language as the user's question.
- Ensure the response maintains continuity with the conversation history.
- List up to 5 most important reference sources at the end under "References" section. Clearly indicating whether each source is from Knowledge Graph (KG) or Vector Data (DC), and include the file path if available, in the following format: [KG/DC] file_path
- List up to 5 most important reference sources at the end under "References" section. Clearly indicating whether each source is from Knowledge Graph (KG) or Document Chunks (DC), and include the file path if available, in the following format: [KG/DC] file_path
- If you don't know the answer, just say so.
- Do not make anything up. Do not include information not provided by the Knowledge Base.
- Addtional user prompt: {user_prompt}
@ -300,7 +300,7 @@ Output:
PROMPTS["naive_rag_response"] = """---Role---
You are a helpful assistant responding to user query about Document Chunks provided below.
You are a helpful assistant responding to user query about Document Chunks provided provided in JSON format below.
---Goal---