update from main

This commit is contained in:
zrguo 2025-07-14 16:27:49 +08:00
parent c9cbd2d3e0
commit 1d0376d6a9
11 changed files with 205 additions and 332 deletions

View file

@ -250,7 +250,7 @@ if __name__ == "__main__":
| **embedding_func_max_async** | `int` | 最大并发异步嵌入进程数 | `16` | | **embedding_func_max_async** | `int` | 最大并发异步嵌入进程数 | `16` |
| **llm_model_func** | `callable` | LLM生成的函数 | `gpt_4o_mini_complete` | | **llm_model_func** | `callable` | LLM生成的函数 | `gpt_4o_mini_complete` |
| **llm_model_name** | `str` | 用于生成的LLM模型名称 | `meta-llama/Llama-3.2-1B-Instruct` | | **llm_model_name** | `str` | 用于生成的LLM模型名称 | `meta-llama/Llama-3.2-1B-Instruct` |
| **llm_model_max_token_size** | `int` | 生成实体关系摘要时送给LLM的最大令牌数 | `32000`默认值由环境变量MAX_TOKENS更改 | | **llm_model_max_token_size** | `int` | LLM生成的最大令牌大小影响实体关系摘要 | `32768`默认值由环境变量MAX_TOKENS更改 |
| **llm_model_max_async** | `int` | 最大并发异步LLM进程数 | `4`默认值由环境变量MAX_ASYNC更改 | | **llm_model_max_async** | `int` | 最大并发异步LLM进程数 | `4`默认值由环境变量MAX_ASYNC更改 |
| **llm_model_kwargs** | `dict` | LLM生成的附加参数 | | | **llm_model_kwargs** | `dict` | LLM生成的附加参数 | |
| **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2默认值由环境变量COSINE_THRESHOLD更改 | | **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2默认值由环境变量COSINE_THRESHOLD更改 |

View file

@ -257,7 +257,7 @@ A full list of LightRAG init parameters:
| **embedding_func_max_async** | `int` | Maximum number of concurrent asynchronous embedding processes | `16` | | **embedding_func_max_async** | `int` | Maximum number of concurrent asynchronous embedding processes | `16` |
| **llm_model_func** | `callable` | Function for LLM generation | `gpt_4o_mini_complete` | | **llm_model_func** | `callable` | Function for LLM generation | `gpt_4o_mini_complete` |
| **llm_model_name** | `str` | LLM model name for generation | `meta-llama/Llama-3.2-1B-Instruct` | | **llm_model_name** | `str` | LLM model name for generation | `meta-llama/Llama-3.2-1B-Instruct` |
| **llm_model_max_token_size** | `int` | Maximum tokens send to LLM to generate entity relation summaries | `32000`default value changed by env var MAX_TOKENS) | | **llm_model_max_token_size** | `int` | Maximum token size for LLM generation (affects entity relation summaries) | `32768`default value changed by env var MAX_TOKENS) |
| **llm_model_max_async** | `int` | Maximum number of concurrent asynchronous LLM processes | `4`default value changed by env var MAX_ASYNC) | | **llm_model_max_async** | `int` | Maximum number of concurrent asynchronous LLM processes | `4`default value changed by env var MAX_ASYNC) |
| **llm_model_kwargs** | `dict` | Additional parameters for LLM generation | | | **llm_model_kwargs** | `dict` | Additional parameters for LLM generation | |
| **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2default value changed by env var COSINE_THRESHOLD) | | **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2default value changed by env var COSINE_THRESHOLD) |

View file

@ -46,6 +46,7 @@ OLLAMA_EMULATING_MODEL_TAG=latest
### Chunk size for document splitting, 500~1500 is recommended ### Chunk size for document splitting, 500~1500 is recommended
# CHUNK_SIZE=1200 # CHUNK_SIZE=1200
# CHUNK_OVERLAP_SIZE=100 # CHUNK_OVERLAP_SIZE=100
# MAX_TOKEN_SUMMARY=500
### RAG Query Configuration ### RAG Query Configuration
# HISTORY_TURNS=3 # HISTORY_TURNS=3
@ -93,7 +94,8 @@ TEMPERATURE=0
### Max concurrency requests of LLM ### Max concurrency requests of LLM
MAX_ASYNC=4 MAX_ASYNC=4
### MAX_TOKENS: max tokens send to LLM for entity relation summaries (less than context size of the model) ### MAX_TOKENS: max tokens send to LLM for entity relation summaries (less than context size of the model)
MAX_TOKENS=32000 ### MAX_TOKENS: set as num_ctx option for Ollama by API Server
MAX_TOKENS=32768
### LLM Binding type: openai, ollama, lollms, azure_openai ### LLM Binding type: openai, ollama, lollms, azure_openai
LLM_BINDING=openai LLM_BINDING=openai
LLM_MODEL=gpt-4o LLM_MODEL=gpt-4o
@ -102,8 +104,6 @@ LLM_BINDING_API_KEY=your_api_key
### Optional for Azure ### Optional for Azure
# AZURE_OPENAI_API_VERSION=2024-08-01-preview # AZURE_OPENAI_API_VERSION=2024-08-01-preview
# AZURE_OPENAI_DEPLOYMENT=gpt-4o # AZURE_OPENAI_DEPLOYMENT=gpt-4o
### set as num_ctx option for Ollama LLM
# OLLAMA_NUM_CTX=32768
### Embedding Configuration ### Embedding Configuration
### Embedding Binding type: openai, ollama, lollms, azure_openai ### Embedding Binding type: openai, ollama, lollms, azure_openai
@ -116,7 +116,7 @@ EMBEDDING_BINDING_HOST=http://localhost:11434
### Num of chunks send to Embedding in single request ### Num of chunks send to Embedding in single request
# EMBEDDING_BATCH_NUM=10 # EMBEDDING_BATCH_NUM=10
### Max concurrency requests for Embedding ### Max concurrency requests for Embedding
# EMBEDDING_FUNC_MAX_ASYNC=8 # EMBEDDING_FUNC_MAX_ASYNC=16
### Maximum tokens sent to Embedding for each chunk (no longer in use?) ### Maximum tokens sent to Embedding for each chunk (no longer in use?)
# MAX_EMBED_TOKENS=8192 # MAX_EMBED_TOKENS=8192
### Optional for Azure ### Optional for Azure

View file

@ -37,7 +37,6 @@ from .base import (
) )
from .prompt import PROMPTS from .prompt import PROMPTS
from .constants import GRAPH_FIELD_SEP from .constants import GRAPH_FIELD_SEP
from .kg.shared_storage import get_storage_keyed_lock
import time import time
from dotenv import load_dotenv from dotenv import load_dotenv
@ -118,7 +117,7 @@ async def _handle_entity_relation_summary(
tokenizer: Tokenizer = global_config["tokenizer"] tokenizer: Tokenizer = global_config["tokenizer"]
llm_max_tokens = global_config["llm_model_max_token_size"] llm_max_tokens = global_config["llm_model_max_token_size"]
# summary_max_tokens = global_config["summary_to_max_tokens"] summary_max_tokens = global_config["summary_to_max_tokens"]
language = global_config["addon_params"].get( language = global_config["addon_params"].get(
"language", PROMPTS["DEFAULT_LANGUAGE"] "language", PROMPTS["DEFAULT_LANGUAGE"]
@ -145,7 +144,7 @@ async def _handle_entity_relation_summary(
use_prompt, use_prompt,
use_llm_func, use_llm_func,
llm_response_cache=llm_response_cache, llm_response_cache=llm_response_cache,
# max_tokens=summary_max_tokens, max_tokens=summary_max_tokens,
cache_type="extract", cache_type="extract",
) )
return summary return summary
@ -275,26 +274,20 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status: dict | None = None, pipeline_status: dict | None = None,
pipeline_status_lock=None, pipeline_status_lock=None,
) -> None: ) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results with parallel processing """Rebuild entity and relationship descriptions from cached extraction results
This method uses cached LLM extraction results instead of calling LLM again, This method uses cached LLM extraction results instead of calling LLM again,
following the same approach as the insert process. Now with parallel processing following the same approach as the insert process.
controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency.
Args: Args:
entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
knowledge_graph_inst: Knowledge graph storage text_chunks_data: Pre-loaded chunk data dict {chunk_id: chunk_data}
entities_vdb: Entity vector database
relationships_vdb: Relationship vector database
text_chunks_storage: Text chunks storage
llm_response_cache: LLM response cache
global_config: Global configuration containing llm_model_max_async
pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status
""" """
if not entities_to_rebuild and not relationships_to_rebuild: if not entities_to_rebuild and not relationships_to_rebuild:
return return
rebuilt_entities_count = 0
rebuilt_relationships_count = 0
# Get all referenced chunk IDs # Get all referenced chunk IDs
all_referenced_chunk_ids = set() all_referenced_chunk_ids = set()
@ -303,7 +296,7 @@ async def _rebuild_knowledge_from_chunks(
for chunk_ids in relationships_to_rebuild.values(): for chunk_ids in relationships_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids) all_referenced_chunk_ids.update(chunk_ids)
status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions (parallel processing)" status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions"
logger.info(status_message) logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock: async with pipeline_status_lock:
@ -373,116 +366,66 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status["history_messages"].append(status_message) pipeline_status["history_messages"].append(status_message)
continue continue
# Get max async tasks limit from global_config for semaphore control # Rebuild entities
graph_max_async = global_config.get("llm_model_max_async", 4) * 2
semaphore = asyncio.Semaphore(graph_max_async)
# Counters for tracking progress
rebuilt_entities_count = 0
rebuilt_relationships_count = 0
failed_entities_count = 0
failed_relationships_count = 0
async def _locked_rebuild_entity(entity_name, chunk_ids):
nonlocal rebuilt_entities_count, failed_entities_count
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
try:
await _rebuild_single_entity(
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_entities_count += 1
status_message = f"Failed to rebuild entity {entity_name}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _locked_rebuild_relationship(src, tgt, chunk_ids):
nonlocal rebuilt_relationships_count, failed_relationships_count
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
f"{src}-{tgt}", namespace=namespace, enable_logging=False
):
try:
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
chunk_relationships=chunk_relationships,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_relationships_count += 1
status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_relationships_count += 1
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Create tasks for parallel processing
tasks = []
# Add entity rebuilding tasks
for entity_name, chunk_ids in entities_to_rebuild.items(): for entity_name, chunk_ids in entities_to_rebuild.items():
task = asyncio.create_task(_locked_rebuild_entity(entity_name, chunk_ids)) try:
tasks.append(task) await _rebuild_single_entity(
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
status_message = f"Failed to rebuild entity {entity_name}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Add relationship rebuilding tasks # Rebuild relationships
for (src, tgt), chunk_ids in relationships_to_rebuild.items(): for (src, tgt), chunk_ids in relationships_to_rebuild.items():
task = asyncio.create_task(_locked_rebuild_relationship(src, tgt, chunk_ids)) try:
tasks.append(task) await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
# Log parallel processing start relationships_vdb=relationships_vdb,
status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (async: {graph_max_async})" src=src,
logger.info(status_message) tgt=tgt,
if pipeline_status is not None and pipeline_status_lock is not None: chunk_ids=chunk_ids,
async with pipeline_status_lock: chunk_relationships=chunk_relationships,
pipeline_status["latest_message"] = status_message llm_response_cache=llm_response_cache,
pipeline_status["history_messages"].append(status_message) global_config=global_config,
)
# Execute all tasks in parallel with semaphore control rebuilt_relationships_count += 1
await asyncio.gather(*tasks) status_message = (
f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
# Final status report )
status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully." logger.info(status_message)
if failed_entities_count > 0 or failed_relationships_count > 0: if pipeline_status is not None and pipeline_status_lock is not None:
status_message += f" Failed: {failed_entities_count} entities, {failed_relationships_count} relationships." async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships."
logger.info(status_message) logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock: async with pipeline_status_lock:
@ -687,10 +630,7 @@ async def _rebuild_single_entity(
# Helper function to generate final description with optional LLM summary # Helper function to generate final description with optional LLM summary
async def _generate_final_description(combined_description: str) -> str: async def _generate_final_description(combined_description: str) -> str:
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] if len(combined_description) > global_config["summary_to_max_tokens"]:
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
return await _handle_entity_relation_summary( return await _handle_entity_relation_summary(
entity_name, entity_name,
combined_description, combined_description,
@ -785,11 +725,7 @@ async def _rebuild_single_relationship(
llm_response_cache: BaseKVStorage, llm_response_cache: BaseKVStorage,
global_config: dict[str, str], global_config: dict[str, str],
) -> None: ) -> None:
"""Rebuild a single relationship from cached extraction results """Rebuild a single relationship from cached extraction results"""
Note: This function assumes the caller has already acquired the appropriate
keyed lock for the relationship pair to ensure thread safety.
"""
# Get current relationship data # Get current relationship data
current_relationship = await knowledge_graph_inst.get_edge(src, tgt) current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
@ -845,11 +781,8 @@ async def _rebuild_single_relationship(
# ) # )
weight = sum(weights) if weights else current_relationship.get("weight", 1.0) weight = sum(weights) if weights else current_relationship.get("weight", 1.0)
# Use summary if description has too many fragments # Use summary if description is too long
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] if len(combined_description) > global_config["summary_to_max_tokens"]:
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
final_description = await _handle_entity_relation_summary( final_description = await _handle_entity_relation_summary(
f"{src}-{tgt}", f"{src}-{tgt}",
combined_description, combined_description,
@ -1082,23 +1015,28 @@ async def _merge_edges_then_upsert(
) )
for need_insert_id in [src_id, tgt_id]: for need_insert_id in [src_id, tgt_id]:
workspace = global_config.get("workspace", "") if not (await knowledge_graph_inst.has_node(need_insert_id)):
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" # # Discard this edge if the node does not exist
async with get_storage_keyed_lock( # if need_insert_id == src_id:
[need_insert_id], namespace=namespace, enable_logging=False # logger.warning(
): # f"Discard edge: {src_id} - {tgt_id} | Source node missing"
if not (await knowledge_graph_inst.has_node(need_insert_id)): # )
await knowledge_graph_inst.upsert_node( # else:
need_insert_id, # logger.warning(
node_data={ # f"Discard edge: {src_id} - {tgt_id} | Target node missing"
"entity_id": need_insert_id, # )
"source_id": source_id, # return None
"description": description, await knowledge_graph_inst.upsert_node(
"entity_type": "UNKNOWN", need_insert_id,
"file_path": file_path, node_data={
"created_at": int(time.time()), "entity_id": need_insert_id,
}, "source_id": source_id,
) "description": description,
"entity_type": "UNKNOWN",
"file_path": file_path,
"created_at": int(time.time()),
},
)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
@ -1180,6 +1118,8 @@ async def merge_nodes_and_edges(
pipeline_status_lock: Lock for pipeline status pipeline_status_lock: Lock for pipeline status
llm_response_cache: LLM response cache llm_response_cache: LLM response cache
""" """
# Get lock manager from shared storage
from .kg.shared_storage import get_graph_db_lock
# Collect all nodes and edges from all chunks # Collect all nodes and edges from all chunks
all_nodes = defaultdict(list) all_nodes = defaultdict(list)
@ -1196,109 +1136,94 @@ async def merge_nodes_and_edges(
all_edges[sorted_edge_key].extend(edges) all_edges[sorted_edge_key].extend(edges)
# Centralized processing of all nodes and edges # Centralized processing of all nodes and edges
total_entities_count = len(all_nodes) entities_data = []
total_relations_count = len(all_edges) relationships_data = []
# Merge nodes and edges # Merge nodes and edges
log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}" # Use graph database lock to ensure atomic merges and updates
logger.info(log_message) graph_db_lock = get_graph_db_lock(enable_logging=False)
async with pipeline_status_lock: async with graph_db_lock:
pipeline_status["latest_message"] = log_message async with pipeline_status_lock:
pipeline_status["history_messages"].append(log_message) log_message = (
f"Merging stage {current_file_number}/{total_files}: {file_path}"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Get max async tasks limit from global_config for semaphore control # Process and update all entities at once
graph_max_async = global_config.get("llm_model_max_async", 4) * 2 for entity_name, entities in all_nodes.items():
semaphore = asyncio.Semaphore(graph_max_async) entity_data = await _merge_nodes_then_upsert(
entity_name,
entities,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
entities_data.append(entity_data)
# Process and update all entities and relationships in parallel # Process and update all relationships at once
log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations (async: {graph_max_async})" for edge_key, edges in all_edges.items():
logger.info(log_message) edge_data = await _merge_edges_then_upsert(
async with pipeline_status_lock: edge_key[0],
pipeline_status["latest_message"] = log_message edge_key[1],
pipeline_status["history_messages"].append(log_message) edges,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if edge_data is not None:
relationships_data.append(edge_data)
async def _locked_process_entity_name(entity_name, entities): # Update total counts
async with semaphore: total_entities_count = len(entities_data)
workspace = global_config.get("workspace", "") total_relations_count = len(relationships_data)
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
entity_data = await _merge_nodes_then_upsert(
entity_name,
entities,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if entity_vdb is not None:
data_for_vdb = {
compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): {
"entity_name": entity_data["entity_name"],
"entity_type": entity_data["entity_type"],
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
"source_id": entity_data["source_id"],
"file_path": entity_data.get("file_path", "unknown_source"),
}
}
await entity_vdb.upsert(data_for_vdb)
return entity_data
async def _locked_process_edges(edge_key, edges): log_message = f"Updating {total_entities_count} entities {current_file_number}/{total_files}: {file_path}"
async with semaphore: logger.info(log_message)
workspace = global_config.get("workspace", "") if pipeline_status is not None:
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB" async with pipeline_status_lock:
async with get_storage_keyed_lock( pipeline_status["latest_message"] = log_message
f"{edge_key[0]}-{edge_key[1]}", pipeline_status["history_messages"].append(log_message)
namespace=namespace,
enable_logging=False,
):
edge_data = await _merge_edges_then_upsert(
edge_key[0],
edge_key[1],
edges,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if edge_data is None:
return None
if relationships_vdb is not None: # Update vector databases with all collected data
data_for_vdb = { if entity_vdb is not None and entities_data:
compute_mdhash_id( data_for_vdb = {
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-" compute_mdhash_id(dp["entity_name"], prefix="ent-"): {
): { "entity_name": dp["entity_name"],
"src_id": edge_data["src_id"], "entity_type": dp["entity_type"],
"tgt_id": edge_data["tgt_id"], "content": f"{dp['entity_name']}\n{dp['description']}",
"keywords": edge_data["keywords"], "source_id": dp["source_id"],
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}", "file_path": dp.get("file_path", "unknown_source"),
"source_id": edge_data["source_id"], }
"file_path": edge_data.get("file_path", "unknown_source"), for dp in entities_data
} }
} await entity_vdb.upsert(data_for_vdb)
await relationships_vdb.upsert(data_for_vdb)
return edge_data
# Create a single task queue for both entities and edges log_message = f"Updating {total_relations_count} relations {current_file_number}/{total_files}: {file_path}"
tasks = [] logger.info(log_message)
if pipeline_status is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Add entity processing tasks if relationships_vdb is not None and relationships_data:
for entity_name, entities in all_nodes.items(): data_for_vdb = {
tasks.append( compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): {
asyncio.create_task(_locked_process_entity_name(entity_name, entities)) "src_id": dp["src_id"],
) "tgt_id": dp["tgt_id"],
"keywords": dp["keywords"],
# Add edge processing tasks "content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}",
for edge_key, edges in all_edges.items(): "source_id": dp["source_id"],
tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges))) "file_path": dp.get("file_path", "unknown_source"),
}
# Execute all tasks in parallel with semaphore control for dp in relationships_data
await asyncio.gather(*tasks) }
await relationships_vdb.upsert(data_for_vdb)
async def extract_entities( async def extract_entities(
@ -1508,8 +1433,8 @@ async def extract_entities(
return maybe_nodes, maybe_edges return maybe_nodes, maybe_edges
# Get max async tasks limit from global_config # Get max async tasks limit from global_config
chunk_max_async = global_config.get("llm_model_max_async", 4) llm_model_max_async = global_config.get("llm_model_max_async", 4)
semaphore = asyncio.Semaphore(chunk_max_async) semaphore = asyncio.Semaphore(llm_model_max_async)
async def _process_with_semaphore(chunk): async def _process_with_semaphore(chunk):
async with semaphore: async with semaphore:

View file

@ -42,28 +42,12 @@ export type LightragStatus = {
vector_storage: string vector_storage: string
workspace?: string workspace?: string
max_graph_nodes?: string max_graph_nodes?: string
enable_rerank?: boolean
rerank_model?: string | null
rerank_binding_host?: string | null
} }
update_status?: Record<string, any> update_status?: Record<string, any>
core_version?: string core_version?: string
api_version?: string api_version?: string
auth_mode?: 'enabled' | 'disabled' auth_mode?: 'enabled' | 'disabled'
pipeline_busy: boolean pipeline_busy: boolean
keyed_locks?: {
process_id: number
cleanup_performed: {
mp_cleaned: number
async_cleaned: number
}
current_status: {
total_mp_locks: number
pending_mp_cleanup: number
total_async_locks: number
pending_async_cleanup: number
}
}
webui_title?: string webui_title?: string
webui_description?: string webui_description?: string
} }

View file

@ -2,6 +2,7 @@ import { useCallback } from 'react'
import { QueryMode, QueryRequest } from '@/api/lightrag' import { QueryMode, QueryRequest } from '@/api/lightrag'
// Removed unused import for Text component // Removed unused import for Text component
import Checkbox from '@/components/ui/Checkbox' import Checkbox from '@/components/ui/Checkbox'
import NumberInput from '@/components/ui/NumberInput'
import Input from '@/components/ui/Input' import Input from '@/components/ui/Input'
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/Card' import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/Card'
import { import {
@ -120,20 +121,11 @@ export default function QuerySettings() {
</TooltipProvider> </TooltipProvider>
<div> <div>
{/* Removed sr-only label */} {/* Removed sr-only label */}
<Input <NumberInput
id="top_k" id="top_k"
type="number" stepper={1}
value={querySettings.top_k ?? ''} value={querySettings.top_k}
onChange={(e) => { onValueChange={(v) => handleChange('top_k', v)}
const value = e.target.value
handleChange('top_k', value === '' ? '' : parseInt(value) || 0)
}}
onBlur={(e) => {
const value = e.target.value
if (value === '' || isNaN(parseInt(value))) {
handleChange('top_k', 1)
}
}}
min={1} min={1}
placeholder={t('retrievePanel.querySettings.topKPlaceholder')} placeholder={t('retrievePanel.querySettings.topKPlaceholder')}
/> />
@ -286,23 +278,15 @@ export default function QuerySettings() {
</TooltipProvider> </TooltipProvider>
<div> <div>
{/* Removed sr-only label */} {/* Removed sr-only label */}
<Input <NumberInput
className="!border-input"
id="history_turns" id="history_turns"
type="number" stepper={1}
value={querySettings.history_turns ?? ''} type="text"
onChange={(e) => { value={querySettings.history_turns}
const value = e.target.value onValueChange={(v) => handleChange('history_turns', v)}
handleChange('history_turns', value === '' ? '' : parseInt(value) || 0)
}}
onBlur={(e) => {
const value = e.target.value
if (value === '' || isNaN(parseInt(value))) {
handleChange('history_turns', 0)
}
}}
min={0} min={0}
placeholder={t('retrievePanel.querySettings.historyTurnsPlaceholder')} placeholder={t('retrievePanel.querySettings.historyTurnsPlaceholder')}
className="h-9"
/> />
</div> </div>
</> </>

View file

@ -252,12 +252,12 @@
"inputDirectory": "دليل الإدخال", "inputDirectory": "دليل الإدخال",
"llmConfig": "تكوين نموذج اللغة الكبير", "llmConfig": "تكوين نموذج اللغة الكبير",
"llmBinding": "ربط نموذج اللغة الكبير", "llmBinding": "ربط نموذج اللغة الكبير",
"llmBindingHost": "نقطة نهاية نموذج اللغة الكبير", "llmBindingHost": "مضيف ربط نموذج اللغة الكبير",
"llmModel": "نموذج اللغة الكبير", "llmModel": "نموذج اللغة الكبير",
"maxTokens": "أقصى عدد من الرموز", "maxTokens": "أقصى عدد من الرموز",
"embeddingConfig": "تكوين التضمين", "embeddingConfig": "تكوين التضمين",
"embeddingBinding": "ربط التضمين", "embeddingBinding": "ربط التضمين",
"embeddingBindingHost": "نقطة نهاية التضمين", "embeddingBindingHost": "مضيف ربط التضمين",
"embeddingModel": "نموذج التضمين", "embeddingModel": "نموذج التضمين",
"storageConfig": "تكوين التخزين", "storageConfig": "تكوين التخزين",
"kvStorage": "تخزين المفتاح-القيمة", "kvStorage": "تخزين المفتاح-القيمة",
@ -265,11 +265,7 @@
"graphStorage": "تخزين الرسم البياني", "graphStorage": "تخزين الرسم البياني",
"vectorStorage": "تخزين المتجهات", "vectorStorage": "تخزين المتجهات",
"workspace": "مساحة العمل", "workspace": "مساحة العمل",
"maxGraphNodes": "الحد الأقصى لعقد الرسم البياني", "maxGraphNodes": "الحد الأقصى لعقد الرسم البياني"
"rerankerConfig": "تكوين إعادة الترتيب",
"rerankerBindingHost": "نقطة نهاية إعادة الترتيب",
"rerankerModel": "نموذج إعادة الترتيب",
"lockStatus": "حالة القفل"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "تعديل {{property}}", "editProperty": "تعديل {{property}}",

View file

@ -252,12 +252,12 @@
"inputDirectory": "Input Directory", "inputDirectory": "Input Directory",
"llmConfig": "LLM Configuration", "llmConfig": "LLM Configuration",
"llmBinding": "LLM Binding", "llmBinding": "LLM Binding",
"llmBindingHost": "LLM Endpoint", "llmBindingHost": "LLM Binding Host",
"llmModel": "LLM Model", "llmModel": "LLM Model",
"maxTokens": "Max Tokens", "maxTokens": "Max Tokens",
"embeddingConfig": "Embedding Configuration", "embeddingConfig": "Embedding Configuration",
"embeddingBinding": "Embedding Binding", "embeddingBinding": "Embedding Binding",
"embeddingBindingHost": "Embedding Endpoint", "embeddingBindingHost": "Embedding Binding Host",
"embeddingModel": "Embedding Model", "embeddingModel": "Embedding Model",
"storageConfig": "Storage Configuration", "storageConfig": "Storage Configuration",
"kvStorage": "KV Storage", "kvStorage": "KV Storage",
@ -265,11 +265,7 @@
"graphStorage": "Graph Storage", "graphStorage": "Graph Storage",
"vectorStorage": "Vector Storage", "vectorStorage": "Vector Storage",
"workspace": "Workspace", "workspace": "Workspace",
"maxGraphNodes": "Max Graph Nodes", "maxGraphNodes": "Max Graph Nodes"
"rerankerConfig": "Reranker Configuration",
"rerankerBindingHost": "Reranker Endpoint",
"rerankerModel": "Reranker Model",
"lockStatus": "Lock Status"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "Edit {{property}}", "editProperty": "Edit {{property}}",

View file

@ -252,12 +252,12 @@
"inputDirectory": "Répertoire d'entrée", "inputDirectory": "Répertoire d'entrée",
"llmConfig": "Configuration du modèle de langage", "llmConfig": "Configuration du modèle de langage",
"llmBinding": "Liaison du modèle de langage", "llmBinding": "Liaison du modèle de langage",
"llmBindingHost": "Point de terminaison LLM", "llmBindingHost": "Hôte de liaison du modèle de langage",
"llmModel": "Modèle de langage", "llmModel": "Modèle de langage",
"maxTokens": "Nombre maximum de jetons", "maxTokens": "Nombre maximum de jetons",
"embeddingConfig": "Configuration d'incorporation", "embeddingConfig": "Configuration d'incorporation",
"embeddingBinding": "Liaison d'incorporation", "embeddingBinding": "Liaison d'incorporation",
"embeddingBindingHost": "Point de terminaison d'incorporation", "embeddingBindingHost": "Hôte de liaison d'incorporation",
"embeddingModel": "Modèle d'incorporation", "embeddingModel": "Modèle d'incorporation",
"storageConfig": "Configuration de stockage", "storageConfig": "Configuration de stockage",
"kvStorage": "Stockage clé-valeur", "kvStorage": "Stockage clé-valeur",
@ -265,11 +265,7 @@
"graphStorage": "Stockage du graphe", "graphStorage": "Stockage du graphe",
"vectorStorage": "Stockage vectoriel", "vectorStorage": "Stockage vectoriel",
"workspace": "Espace de travail", "workspace": "Espace de travail",
"maxGraphNodes": "Nombre maximum de nœuds du graphe", "maxGraphNodes": "Nombre maximum de nœuds du graphe"
"rerankerConfig": "Configuration du reclassement",
"rerankerBindingHost": "Point de terminaison de reclassement",
"rerankerModel": "Modèle de reclassement",
"lockStatus": "État des verrous"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "Modifier {{property}}", "editProperty": "Modifier {{property}}",

View file

@ -252,12 +252,12 @@
"inputDirectory": "输入目录", "inputDirectory": "输入目录",
"llmConfig": "LLM配置", "llmConfig": "LLM配置",
"llmBinding": "LLM绑定", "llmBinding": "LLM绑定",
"llmBindingHost": "LLM端点", "llmBindingHost": "LLM绑定主机",
"llmModel": "LLM模型", "llmModel": "LLM模型",
"maxTokens": "最大令牌数", "maxTokens": "最大令牌数",
"embeddingConfig": "嵌入配置", "embeddingConfig": "嵌入配置",
"embeddingBinding": "嵌入绑定", "embeddingBinding": "嵌入绑定",
"embeddingBindingHost": "嵌入端点", "embeddingBindingHost": "嵌入绑定主机",
"embeddingModel": "嵌入模型", "embeddingModel": "嵌入模型",
"storageConfig": "存储配置", "storageConfig": "存储配置",
"kvStorage": "KV存储", "kvStorage": "KV存储",
@ -265,11 +265,7 @@
"graphStorage": "图存储", "graphStorage": "图存储",
"vectorStorage": "向量存储", "vectorStorage": "向量存储",
"workspace": "工作空间", "workspace": "工作空间",
"maxGraphNodes": "最大图节点数", "maxGraphNodes": "最大图节点数"
"rerankerConfig": "重排序配置",
"rerankerBindingHost": "重排序端点",
"rerankerModel": "重排序模型",
"lockStatus": "锁状态"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "编辑{{property}}", "editProperty": "编辑{{property}}",

View file

@ -252,12 +252,12 @@
"inputDirectory": "輸入目錄", "inputDirectory": "輸入目錄",
"llmConfig": "LLM 設定", "llmConfig": "LLM 設定",
"llmBinding": "LLM 綁定", "llmBinding": "LLM 綁定",
"llmBindingHost": "LLM 端點", "llmBindingHost": "LLM 綁定主機",
"llmModel": "LLM 模型", "llmModel": "LLM 模型",
"maxTokens": "最大權杖數", "maxTokens": "最大權杖數",
"embeddingConfig": "嵌入設定", "embeddingConfig": "嵌入設定",
"embeddingBinding": "嵌入綁定", "embeddingBinding": "嵌入綁定",
"embeddingBindingHost": "嵌入端點", "embeddingBindingHost": "嵌入綁定主機",
"embeddingModel": "嵌入模型", "embeddingModel": "嵌入模型",
"storageConfig": "儲存設定", "storageConfig": "儲存設定",
"kvStorage": "KV 儲存", "kvStorage": "KV 儲存",
@ -265,11 +265,7 @@
"graphStorage": "圖形儲存", "graphStorage": "圖形儲存",
"vectorStorage": "向量儲存", "vectorStorage": "向量儲存",
"workspace": "工作空間", "workspace": "工作空間",
"maxGraphNodes": "最大圖形節點數", "maxGraphNodes": "最大圖形節點數"
"rerankerConfig": "重排序設定",
"rerankerBindingHost": "重排序端點",
"rerankerModel": "重排序模型",
"lockStatus": "鎖定狀態"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "編輯{{property}}", "editProperty": "編輯{{property}}",