Revert "update from main"

This reverts commit 1d0376d6a9.
This commit is contained in:
zrguo 2025-07-14 16:29:00 +08:00
parent 1d0376d6a9
commit 4e425b1b59
11 changed files with 332 additions and 205 deletions

View file

@ -250,7 +250,7 @@ if __name__ == "__main__":
| **embedding_func_max_async** | `int` | 最大并发异步嵌入进程数 | `16` | | **embedding_func_max_async** | `int` | 最大并发异步嵌入进程数 | `16` |
| **llm_model_func** | `callable` | LLM生成的函数 | `gpt_4o_mini_complete` | | **llm_model_func** | `callable` | LLM生成的函数 | `gpt_4o_mini_complete` |
| **llm_model_name** | `str` | 用于生成的LLM模型名称 | `meta-llama/Llama-3.2-1B-Instruct` | | **llm_model_name** | `str` | 用于生成的LLM模型名称 | `meta-llama/Llama-3.2-1B-Instruct` |
| **llm_model_max_token_size** | `int` | LLM生成的最大令牌大小影响实体关系摘要 | `32768`默认值由环境变量MAX_TOKENS更改 | | **llm_model_max_token_size** | `int` | 生成实体关系摘要时送给LLM的最大令牌数 | `32000`默认值由环境变量MAX_TOKENS更改 |
| **llm_model_max_async** | `int` | 最大并发异步LLM进程数 | `4`默认值由环境变量MAX_ASYNC更改 | | **llm_model_max_async** | `int` | 最大并发异步LLM进程数 | `4`默认值由环境变量MAX_ASYNC更改 |
| **llm_model_kwargs** | `dict` | LLM生成的附加参数 | | | **llm_model_kwargs** | `dict` | LLM生成的附加参数 | |
| **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2默认值由环境变量COSINE_THRESHOLD更改 | | **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2默认值由环境变量COSINE_THRESHOLD更改 |

View file

@ -257,7 +257,7 @@ A full list of LightRAG init parameters:
| **embedding_func_max_async** | `int` | Maximum number of concurrent asynchronous embedding processes | `16` | | **embedding_func_max_async** | `int` | Maximum number of concurrent asynchronous embedding processes | `16` |
| **llm_model_func** | `callable` | Function for LLM generation | `gpt_4o_mini_complete` | | **llm_model_func** | `callable` | Function for LLM generation | `gpt_4o_mini_complete` |
| **llm_model_name** | `str` | LLM model name for generation | `meta-llama/Llama-3.2-1B-Instruct` | | **llm_model_name** | `str` | LLM model name for generation | `meta-llama/Llama-3.2-1B-Instruct` |
| **llm_model_max_token_size** | `int` | Maximum token size for LLM generation (affects entity relation summaries) | `32768`default value changed by env var MAX_TOKENS) | | **llm_model_max_token_size** | `int` | Maximum tokens send to LLM to generate entity relation summaries | `32000`default value changed by env var MAX_TOKENS) |
| **llm_model_max_async** | `int` | Maximum number of concurrent asynchronous LLM processes | `4`default value changed by env var MAX_ASYNC) | | **llm_model_max_async** | `int` | Maximum number of concurrent asynchronous LLM processes | `4`default value changed by env var MAX_ASYNC) |
| **llm_model_kwargs** | `dict` | Additional parameters for LLM generation | | | **llm_model_kwargs** | `dict` | Additional parameters for LLM generation | |
| **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2default value changed by env var COSINE_THRESHOLD) | | **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2default value changed by env var COSINE_THRESHOLD) |

View file

@ -46,7 +46,6 @@ OLLAMA_EMULATING_MODEL_TAG=latest
### Chunk size for document splitting, 500~1500 is recommended ### Chunk size for document splitting, 500~1500 is recommended
# CHUNK_SIZE=1200 # CHUNK_SIZE=1200
# CHUNK_OVERLAP_SIZE=100 # CHUNK_OVERLAP_SIZE=100
# MAX_TOKEN_SUMMARY=500
### RAG Query Configuration ### RAG Query Configuration
# HISTORY_TURNS=3 # HISTORY_TURNS=3
@ -94,8 +93,7 @@ TEMPERATURE=0
### Max concurrency requests of LLM ### Max concurrency requests of LLM
MAX_ASYNC=4 MAX_ASYNC=4
### MAX_TOKENS: max tokens send to LLM for entity relation summaries (less than context size of the model) ### MAX_TOKENS: max tokens send to LLM for entity relation summaries (less than context size of the model)
### MAX_TOKENS: set as num_ctx option for Ollama by API Server MAX_TOKENS=32000
MAX_TOKENS=32768
### LLM Binding type: openai, ollama, lollms, azure_openai ### LLM Binding type: openai, ollama, lollms, azure_openai
LLM_BINDING=openai LLM_BINDING=openai
LLM_MODEL=gpt-4o LLM_MODEL=gpt-4o
@ -104,6 +102,8 @@ LLM_BINDING_API_KEY=your_api_key
### Optional for Azure ### Optional for Azure
# AZURE_OPENAI_API_VERSION=2024-08-01-preview # AZURE_OPENAI_API_VERSION=2024-08-01-preview
# AZURE_OPENAI_DEPLOYMENT=gpt-4o # AZURE_OPENAI_DEPLOYMENT=gpt-4o
### set as num_ctx option for Ollama LLM
# OLLAMA_NUM_CTX=32768
### Embedding Configuration ### Embedding Configuration
### Embedding Binding type: openai, ollama, lollms, azure_openai ### Embedding Binding type: openai, ollama, lollms, azure_openai
@ -116,7 +116,7 @@ EMBEDDING_BINDING_HOST=http://localhost:11434
### Num of chunks send to Embedding in single request ### Num of chunks send to Embedding in single request
# EMBEDDING_BATCH_NUM=10 # EMBEDDING_BATCH_NUM=10
### Max concurrency requests for Embedding ### Max concurrency requests for Embedding
# EMBEDDING_FUNC_MAX_ASYNC=16 # EMBEDDING_FUNC_MAX_ASYNC=8
### Maximum tokens sent to Embedding for each chunk (no longer in use?) ### Maximum tokens sent to Embedding for each chunk (no longer in use?)
# MAX_EMBED_TOKENS=8192 # MAX_EMBED_TOKENS=8192
### Optional for Azure ### Optional for Azure

View file

@ -37,6 +37,7 @@ from .base import (
) )
from .prompt import PROMPTS from .prompt import PROMPTS
from .constants import GRAPH_FIELD_SEP from .constants import GRAPH_FIELD_SEP
from .kg.shared_storage import get_storage_keyed_lock
import time import time
from dotenv import load_dotenv from dotenv import load_dotenv
@ -117,7 +118,7 @@ async def _handle_entity_relation_summary(
tokenizer: Tokenizer = global_config["tokenizer"] tokenizer: Tokenizer = global_config["tokenizer"]
llm_max_tokens = global_config["llm_model_max_token_size"] llm_max_tokens = global_config["llm_model_max_token_size"]
summary_max_tokens = global_config["summary_to_max_tokens"] # summary_max_tokens = global_config["summary_to_max_tokens"]
language = global_config["addon_params"].get( language = global_config["addon_params"].get(
"language", PROMPTS["DEFAULT_LANGUAGE"] "language", PROMPTS["DEFAULT_LANGUAGE"]
@ -144,7 +145,7 @@ async def _handle_entity_relation_summary(
use_prompt, use_prompt,
use_llm_func, use_llm_func,
llm_response_cache=llm_response_cache, llm_response_cache=llm_response_cache,
max_tokens=summary_max_tokens, # max_tokens=summary_max_tokens,
cache_type="extract", cache_type="extract",
) )
return summary return summary
@ -274,20 +275,26 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status: dict | None = None, pipeline_status: dict | None = None,
pipeline_status_lock=None, pipeline_status_lock=None,
) -> None: ) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results """Rebuild entity and relationship descriptions from cached extraction results with parallel processing
This method uses cached LLM extraction results instead of calling LLM again, This method uses cached LLM extraction results instead of calling LLM again,
following the same approach as the insert process. following the same approach as the insert process. Now with parallel processing
controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency.
Args: Args:
entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids
relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids
text_chunks_data: Pre-loaded chunk data dict {chunk_id: chunk_data} knowledge_graph_inst: Knowledge graph storage
entities_vdb: Entity vector database
relationships_vdb: Relationship vector database
text_chunks_storage: Text chunks storage
llm_response_cache: LLM response cache
global_config: Global configuration containing llm_model_max_async
pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status
""" """
if not entities_to_rebuild and not relationships_to_rebuild: if not entities_to_rebuild and not relationships_to_rebuild:
return return
rebuilt_entities_count = 0
rebuilt_relationships_count = 0
# Get all referenced chunk IDs # Get all referenced chunk IDs
all_referenced_chunk_ids = set() all_referenced_chunk_ids = set()
@ -296,7 +303,7 @@ async def _rebuild_knowledge_from_chunks(
for chunk_ids in relationships_to_rebuild.values(): for chunk_ids in relationships_to_rebuild.values():
all_referenced_chunk_ids.update(chunk_ids) all_referenced_chunk_ids.update(chunk_ids)
status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions" status_message = f"Rebuilding knowledge from {len(all_referenced_chunk_ids)} cached chunk extractions (parallel processing)"
logger.info(status_message) logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock: async with pipeline_status_lock:
@ -366,66 +373,116 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status["history_messages"].append(status_message) pipeline_status["history_messages"].append(status_message)
continue continue
# Rebuild entities # Get max async tasks limit from global_config for semaphore control
graph_max_async = global_config.get("llm_model_max_async", 4) * 2
semaphore = asyncio.Semaphore(graph_max_async)
# Counters for tracking progress
rebuilt_entities_count = 0
rebuilt_relationships_count = 0
failed_entities_count = 0
failed_relationships_count = 0
async def _locked_rebuild_entity(entity_name, chunk_ids):
nonlocal rebuilt_entities_count, failed_entities_count
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
try:
await _rebuild_single_entity(
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_entities_count += 1
status_message = f"Failed to rebuild entity {entity_name}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _locked_rebuild_relationship(src, tgt, chunk_ids):
nonlocal rebuilt_relationships_count, failed_relationships_count
async with semaphore:
workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
f"{src}-{tgt}", namespace=namespace, enable_logging=False
):
try:
await _rebuild_single_relationship(
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb,
src=src,
tgt=tgt,
chunk_ids=chunk_ids,
chunk_relationships=chunk_relationships,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_relationships_count += 1
status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_relationships_count += 1
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Create tasks for parallel processing
tasks = []
# Add entity rebuilding tasks
for entity_name, chunk_ids in entities_to_rebuild.items(): for entity_name, chunk_ids in entities_to_rebuild.items():
try: task = asyncio.create_task(_locked_rebuild_entity(entity_name, chunk_ids))
await _rebuild_single_entity( tasks.append(task)
knowledge_graph_inst=knowledge_graph_inst,
entities_vdb=entities_vdb,
entity_name=entity_name,
chunk_ids=chunk_ids,
chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache,
global_config=global_config,
)
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
status_message = f"Failed to rebuild entity {entity_name}: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
# Rebuild relationships # Add relationship rebuilding tasks
for (src, tgt), chunk_ids in relationships_to_rebuild.items(): for (src, tgt), chunk_ids in relationships_to_rebuild.items():
try: task = asyncio.create_task(_locked_rebuild_relationship(src, tgt, chunk_ids))
await _rebuild_single_relationship( tasks.append(task)
knowledge_graph_inst=knowledge_graph_inst,
relationships_vdb=relationships_vdb, # Log parallel processing start
src=src, status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (async: {graph_max_async})"
tgt=tgt, logger.info(status_message)
chunk_ids=chunk_ids, if pipeline_status is not None and pipeline_status_lock is not None:
chunk_relationships=chunk_relationships, async with pipeline_status_lock:
llm_response_cache=llm_response_cache, pipeline_status["latest_message"] = status_message
global_config=global_config, pipeline_status["history_messages"].append(status_message)
)
rebuilt_relationships_count += 1 # Execute all tasks in parallel with semaphore control
status_message = ( await asyncio.gather(*tasks)
f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
) # Final status report
logger.info(status_message) status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships rebuilt successfully."
if pipeline_status is not None and pipeline_status_lock is not None: if failed_entities_count > 0 or failed_relationships_count > 0:
async with pipeline_status_lock: status_message += f" Failed: {failed_entities_count} entities, {failed_relationships_count} relationships."
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e:
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
status_message = f"KG rebuild completed: {rebuilt_entities_count} entities and {rebuilt_relationships_count} relationships."
logger.info(status_message) logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock: async with pipeline_status_lock:
@ -630,7 +687,10 @@ async def _rebuild_single_entity(
# Helper function to generate final description with optional LLM summary # Helper function to generate final description with optional LLM summary
async def _generate_final_description(combined_description: str) -> str: async def _generate_final_description(combined_description: str) -> str:
if len(combined_description) > global_config["summary_to_max_tokens"]: force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
return await _handle_entity_relation_summary( return await _handle_entity_relation_summary(
entity_name, entity_name,
combined_description, combined_description,
@ -725,7 +785,11 @@ async def _rebuild_single_relationship(
llm_response_cache: BaseKVStorage, llm_response_cache: BaseKVStorage,
global_config: dict[str, str], global_config: dict[str, str],
) -> None: ) -> None:
"""Rebuild a single relationship from cached extraction results""" """Rebuild a single relationship from cached extraction results
Note: This function assumes the caller has already acquired the appropriate
keyed lock for the relationship pair to ensure thread safety.
"""
# Get current relationship data # Get current relationship data
current_relationship = await knowledge_graph_inst.get_edge(src, tgt) current_relationship = await knowledge_graph_inst.get_edge(src, tgt)
@ -781,8 +845,11 @@ async def _rebuild_single_relationship(
# ) # )
weight = sum(weights) if weights else current_relationship.get("weight", 1.0) weight = sum(weights) if weights else current_relationship.get("weight", 1.0)
# Use summary if description is too long # Use summary if description has too many fragments
if len(combined_description) > global_config["summary_to_max_tokens"]: force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
final_description = await _handle_entity_relation_summary( final_description = await _handle_entity_relation_summary(
f"{src}-{tgt}", f"{src}-{tgt}",
combined_description, combined_description,
@ -1015,28 +1082,23 @@ async def _merge_edges_then_upsert(
) )
for need_insert_id in [src_id, tgt_id]: for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)): workspace = global_config.get("workspace", "")
# # Discard this edge if the node does not exist namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
# if need_insert_id == src_id: async with get_storage_keyed_lock(
# logger.warning( [need_insert_id], namespace=namespace, enable_logging=False
# f"Discard edge: {src_id} - {tgt_id} | Source node missing" ):
# ) if not (await knowledge_graph_inst.has_node(need_insert_id)):
# else: await knowledge_graph_inst.upsert_node(
# logger.warning( need_insert_id,
# f"Discard edge: {src_id} - {tgt_id} | Target node missing" node_data={
# ) "entity_id": need_insert_id,
# return None "source_id": source_id,
await knowledge_graph_inst.upsert_node( "description": description,
need_insert_id, "entity_type": "UNKNOWN",
node_data={ "file_path": file_path,
"entity_id": need_insert_id, "created_at": int(time.time()),
"source_id": source_id, },
"description": description, )
"entity_type": "UNKNOWN",
"file_path": file_path,
"created_at": int(time.time()),
},
)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"] force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
@ -1118,8 +1180,6 @@ async def merge_nodes_and_edges(
pipeline_status_lock: Lock for pipeline status pipeline_status_lock: Lock for pipeline status
llm_response_cache: LLM response cache llm_response_cache: LLM response cache
""" """
# Get lock manager from shared storage
from .kg.shared_storage import get_graph_db_lock
# Collect all nodes and edges from all chunks # Collect all nodes and edges from all chunks
all_nodes = defaultdict(list) all_nodes = defaultdict(list)
@ -1136,94 +1196,109 @@ async def merge_nodes_and_edges(
all_edges[sorted_edge_key].extend(edges) all_edges[sorted_edge_key].extend(edges)
# Centralized processing of all nodes and edges # Centralized processing of all nodes and edges
entities_data = [] total_entities_count = len(all_nodes)
relationships_data = [] total_relations_count = len(all_edges)
# Merge nodes and edges # Merge nodes and edges
# Use graph database lock to ensure atomic merges and updates log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}"
graph_db_lock = get_graph_db_lock(enable_logging=False) logger.info(log_message)
async with graph_db_lock: async with pipeline_status_lock:
async with pipeline_status_lock: pipeline_status["latest_message"] = log_message
log_message = ( pipeline_status["history_messages"].append(log_message)
f"Merging stage {current_file_number}/{total_files}: {file_path}"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Process and update all entities at once # Get max async tasks limit from global_config for semaphore control
for entity_name, entities in all_nodes.items(): graph_max_async = global_config.get("llm_model_max_async", 4) * 2
entity_data = await _merge_nodes_then_upsert( semaphore = asyncio.Semaphore(graph_max_async)
entity_name,
entities,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
entities_data.append(entity_data)
# Process and update all relationships at once # Process and update all entities and relationships in parallel
for edge_key, edges in all_edges.items(): log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations (async: {graph_max_async})"
edge_data = await _merge_edges_then_upsert( logger.info(log_message)
edge_key[0], async with pipeline_status_lock:
edge_key[1], pipeline_status["latest_message"] = log_message
edges, pipeline_status["history_messages"].append(log_message)
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if edge_data is not None:
relationships_data.append(edge_data)
# Update total counts async def _locked_process_entity_name(entity_name, entities):
total_entities_count = len(entities_data) async with semaphore:
total_relations_count = len(relationships_data) workspace = global_config.get("workspace", "")
namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
async with get_storage_keyed_lock(
[entity_name], namespace=namespace, enable_logging=False
):
entity_data = await _merge_nodes_then_upsert(
entity_name,
entities,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if entity_vdb is not None:
data_for_vdb = {
compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): {
"entity_name": entity_data["entity_name"],
"entity_type": entity_data["entity_type"],
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
"source_id": entity_data["source_id"],
"file_path": entity_data.get("file_path", "unknown_source"),
}
}
await entity_vdb.upsert(data_for_vdb)
return entity_data
log_message = f"Updating {total_entities_count} entities {current_file_number}/{total_files}: {file_path}" async def _locked_process_edges(edge_key, edges):
logger.info(log_message) async with semaphore:
if pipeline_status is not None: workspace = global_config.get("workspace", "")
async with pipeline_status_lock: namespace = f"{workspace}:GraphDB" if workspace else "GraphDB"
pipeline_status["latest_message"] = log_message async with get_storage_keyed_lock(
pipeline_status["history_messages"].append(log_message) f"{edge_key[0]}-{edge_key[1]}",
namespace=namespace,
enable_logging=False,
):
edge_data = await _merge_edges_then_upsert(
edge_key[0],
edge_key[1],
edges,
knowledge_graph_inst,
global_config,
pipeline_status,
pipeline_status_lock,
llm_response_cache,
)
if edge_data is None:
return None
# Update vector databases with all collected data if relationships_vdb is not None:
if entity_vdb is not None and entities_data: data_for_vdb = {
data_for_vdb = { compute_mdhash_id(
compute_mdhash_id(dp["entity_name"], prefix="ent-"): { edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
"entity_name": dp["entity_name"], ): {
"entity_type": dp["entity_type"], "src_id": edge_data["src_id"],
"content": f"{dp['entity_name']}\n{dp['description']}", "tgt_id": edge_data["tgt_id"],
"source_id": dp["source_id"], "keywords": edge_data["keywords"],
"file_path": dp.get("file_path", "unknown_source"), "content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
} "source_id": edge_data["source_id"],
for dp in entities_data "file_path": edge_data.get("file_path", "unknown_source"),
} }
await entity_vdb.upsert(data_for_vdb) }
await relationships_vdb.upsert(data_for_vdb)
return edge_data
log_message = f"Updating {total_relations_count} relations {current_file_number}/{total_files}: {file_path}" # Create a single task queue for both entities and edges
logger.info(log_message) tasks = []
if pipeline_status is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
if relationships_vdb is not None and relationships_data: # Add entity processing tasks
data_for_vdb = { for entity_name, entities in all_nodes.items():
compute_mdhash_id(dp["src_id"] + dp["tgt_id"], prefix="rel-"): { tasks.append(
"src_id": dp["src_id"], asyncio.create_task(_locked_process_entity_name(entity_name, entities))
"tgt_id": dp["tgt_id"], )
"keywords": dp["keywords"],
"content": f"{dp['src_id']}\t{dp['tgt_id']}\n{dp['keywords']}\n{dp['description']}", # Add edge processing tasks
"source_id": dp["source_id"], for edge_key, edges in all_edges.items():
"file_path": dp.get("file_path", "unknown_source"), tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges)))
}
for dp in relationships_data # Execute all tasks in parallel with semaphore control
} await asyncio.gather(*tasks)
await relationships_vdb.upsert(data_for_vdb)
async def extract_entities( async def extract_entities(
@ -1433,8 +1508,8 @@ async def extract_entities(
return maybe_nodes, maybe_edges return maybe_nodes, maybe_edges
# Get max async tasks limit from global_config # Get max async tasks limit from global_config
llm_model_max_async = global_config.get("llm_model_max_async", 4) chunk_max_async = global_config.get("llm_model_max_async", 4)
semaphore = asyncio.Semaphore(llm_model_max_async) semaphore = asyncio.Semaphore(chunk_max_async)
async def _process_with_semaphore(chunk): async def _process_with_semaphore(chunk):
async with semaphore: async with semaphore:

View file

@ -42,12 +42,28 @@ export type LightragStatus = {
vector_storage: string vector_storage: string
workspace?: string workspace?: string
max_graph_nodes?: string max_graph_nodes?: string
enable_rerank?: boolean
rerank_model?: string | null
rerank_binding_host?: string | null
} }
update_status?: Record<string, any> update_status?: Record<string, any>
core_version?: string core_version?: string
api_version?: string api_version?: string
auth_mode?: 'enabled' | 'disabled' auth_mode?: 'enabled' | 'disabled'
pipeline_busy: boolean pipeline_busy: boolean
keyed_locks?: {
process_id: number
cleanup_performed: {
mp_cleaned: number
async_cleaned: number
}
current_status: {
total_mp_locks: number
pending_mp_cleanup: number
total_async_locks: number
pending_async_cleanup: number
}
}
webui_title?: string webui_title?: string
webui_description?: string webui_description?: string
} }

View file

@ -2,7 +2,6 @@ import { useCallback } from 'react'
import { QueryMode, QueryRequest } from '@/api/lightrag' import { QueryMode, QueryRequest } from '@/api/lightrag'
// Removed unused import for Text component // Removed unused import for Text component
import Checkbox from '@/components/ui/Checkbox' import Checkbox from '@/components/ui/Checkbox'
import NumberInput from '@/components/ui/NumberInput'
import Input from '@/components/ui/Input' import Input from '@/components/ui/Input'
import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/Card' import { Card, CardContent, CardDescription, CardHeader, CardTitle } from '@/components/ui/Card'
import { import {
@ -121,11 +120,20 @@ export default function QuerySettings() {
</TooltipProvider> </TooltipProvider>
<div> <div>
{/* Removed sr-only label */} {/* Removed sr-only label */}
<NumberInput <Input
id="top_k" id="top_k"
stepper={1} type="number"
value={querySettings.top_k} value={querySettings.top_k ?? ''}
onValueChange={(v) => handleChange('top_k', v)} onChange={(e) => {
const value = e.target.value
handleChange('top_k', value === '' ? '' : parseInt(value) || 0)
}}
onBlur={(e) => {
const value = e.target.value
if (value === '' || isNaN(parseInt(value))) {
handleChange('top_k', 1)
}
}}
min={1} min={1}
placeholder={t('retrievePanel.querySettings.topKPlaceholder')} placeholder={t('retrievePanel.querySettings.topKPlaceholder')}
/> />
@ -278,15 +286,23 @@ export default function QuerySettings() {
</TooltipProvider> </TooltipProvider>
<div> <div>
{/* Removed sr-only label */} {/* Removed sr-only label */}
<NumberInput <Input
className="!border-input"
id="history_turns" id="history_turns"
stepper={1} type="number"
type="text" value={querySettings.history_turns ?? ''}
value={querySettings.history_turns} onChange={(e) => {
onValueChange={(v) => handleChange('history_turns', v)} const value = e.target.value
handleChange('history_turns', value === '' ? '' : parseInt(value) || 0)
}}
onBlur={(e) => {
const value = e.target.value
if (value === '' || isNaN(parseInt(value))) {
handleChange('history_turns', 0)
}
}}
min={0} min={0}
placeholder={t('retrievePanel.querySettings.historyTurnsPlaceholder')} placeholder={t('retrievePanel.querySettings.historyTurnsPlaceholder')}
className="h-9"
/> />
</div> </div>
</> </>

View file

@ -252,12 +252,12 @@
"inputDirectory": "دليل الإدخال", "inputDirectory": "دليل الإدخال",
"llmConfig": "تكوين نموذج اللغة الكبير", "llmConfig": "تكوين نموذج اللغة الكبير",
"llmBinding": "ربط نموذج اللغة الكبير", "llmBinding": "ربط نموذج اللغة الكبير",
"llmBindingHost": "مضيف ربط نموذج اللغة الكبير", "llmBindingHost": "نقطة نهاية نموذج اللغة الكبير",
"llmModel": "نموذج اللغة الكبير", "llmModel": "نموذج اللغة الكبير",
"maxTokens": "أقصى عدد من الرموز", "maxTokens": "أقصى عدد من الرموز",
"embeddingConfig": "تكوين التضمين", "embeddingConfig": "تكوين التضمين",
"embeddingBinding": "ربط التضمين", "embeddingBinding": "ربط التضمين",
"embeddingBindingHost": "مضيف ربط التضمين", "embeddingBindingHost": "نقطة نهاية التضمين",
"embeddingModel": "نموذج التضمين", "embeddingModel": "نموذج التضمين",
"storageConfig": "تكوين التخزين", "storageConfig": "تكوين التخزين",
"kvStorage": "تخزين المفتاح-القيمة", "kvStorage": "تخزين المفتاح-القيمة",
@ -265,7 +265,11 @@
"graphStorage": "تخزين الرسم البياني", "graphStorage": "تخزين الرسم البياني",
"vectorStorage": "تخزين المتجهات", "vectorStorage": "تخزين المتجهات",
"workspace": "مساحة العمل", "workspace": "مساحة العمل",
"maxGraphNodes": "الحد الأقصى لعقد الرسم البياني" "maxGraphNodes": "الحد الأقصى لعقد الرسم البياني",
"rerankerConfig": "تكوين إعادة الترتيب",
"rerankerBindingHost": "نقطة نهاية إعادة الترتيب",
"rerankerModel": "نموذج إعادة الترتيب",
"lockStatus": "حالة القفل"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "تعديل {{property}}", "editProperty": "تعديل {{property}}",

View file

@ -252,12 +252,12 @@
"inputDirectory": "Input Directory", "inputDirectory": "Input Directory",
"llmConfig": "LLM Configuration", "llmConfig": "LLM Configuration",
"llmBinding": "LLM Binding", "llmBinding": "LLM Binding",
"llmBindingHost": "LLM Binding Host", "llmBindingHost": "LLM Endpoint",
"llmModel": "LLM Model", "llmModel": "LLM Model",
"maxTokens": "Max Tokens", "maxTokens": "Max Tokens",
"embeddingConfig": "Embedding Configuration", "embeddingConfig": "Embedding Configuration",
"embeddingBinding": "Embedding Binding", "embeddingBinding": "Embedding Binding",
"embeddingBindingHost": "Embedding Binding Host", "embeddingBindingHost": "Embedding Endpoint",
"embeddingModel": "Embedding Model", "embeddingModel": "Embedding Model",
"storageConfig": "Storage Configuration", "storageConfig": "Storage Configuration",
"kvStorage": "KV Storage", "kvStorage": "KV Storage",
@ -265,7 +265,11 @@
"graphStorage": "Graph Storage", "graphStorage": "Graph Storage",
"vectorStorage": "Vector Storage", "vectorStorage": "Vector Storage",
"workspace": "Workspace", "workspace": "Workspace",
"maxGraphNodes": "Max Graph Nodes" "maxGraphNodes": "Max Graph Nodes",
"rerankerConfig": "Reranker Configuration",
"rerankerBindingHost": "Reranker Endpoint",
"rerankerModel": "Reranker Model",
"lockStatus": "Lock Status"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "Edit {{property}}", "editProperty": "Edit {{property}}",

View file

@ -252,12 +252,12 @@
"inputDirectory": "Répertoire d'entrée", "inputDirectory": "Répertoire d'entrée",
"llmConfig": "Configuration du modèle de langage", "llmConfig": "Configuration du modèle de langage",
"llmBinding": "Liaison du modèle de langage", "llmBinding": "Liaison du modèle de langage",
"llmBindingHost": "Hôte de liaison du modèle de langage", "llmBindingHost": "Point de terminaison LLM",
"llmModel": "Modèle de langage", "llmModel": "Modèle de langage",
"maxTokens": "Nombre maximum de jetons", "maxTokens": "Nombre maximum de jetons",
"embeddingConfig": "Configuration d'incorporation", "embeddingConfig": "Configuration d'incorporation",
"embeddingBinding": "Liaison d'incorporation", "embeddingBinding": "Liaison d'incorporation",
"embeddingBindingHost": "Hôte de liaison d'incorporation", "embeddingBindingHost": "Point de terminaison d'incorporation",
"embeddingModel": "Modèle d'incorporation", "embeddingModel": "Modèle d'incorporation",
"storageConfig": "Configuration de stockage", "storageConfig": "Configuration de stockage",
"kvStorage": "Stockage clé-valeur", "kvStorage": "Stockage clé-valeur",
@ -265,7 +265,11 @@
"graphStorage": "Stockage du graphe", "graphStorage": "Stockage du graphe",
"vectorStorage": "Stockage vectoriel", "vectorStorage": "Stockage vectoriel",
"workspace": "Espace de travail", "workspace": "Espace de travail",
"maxGraphNodes": "Nombre maximum de nœuds du graphe" "maxGraphNodes": "Nombre maximum de nœuds du graphe",
"rerankerConfig": "Configuration du reclassement",
"rerankerBindingHost": "Point de terminaison de reclassement",
"rerankerModel": "Modèle de reclassement",
"lockStatus": "État des verrous"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "Modifier {{property}}", "editProperty": "Modifier {{property}}",

View file

@ -252,12 +252,12 @@
"inputDirectory": "输入目录", "inputDirectory": "输入目录",
"llmConfig": "LLM配置", "llmConfig": "LLM配置",
"llmBinding": "LLM绑定", "llmBinding": "LLM绑定",
"llmBindingHost": "LLM绑定主机", "llmBindingHost": "LLM端点",
"llmModel": "LLM模型", "llmModel": "LLM模型",
"maxTokens": "最大令牌数", "maxTokens": "最大令牌数",
"embeddingConfig": "嵌入配置", "embeddingConfig": "嵌入配置",
"embeddingBinding": "嵌入绑定", "embeddingBinding": "嵌入绑定",
"embeddingBindingHost": "嵌入绑定主机", "embeddingBindingHost": "嵌入端点",
"embeddingModel": "嵌入模型", "embeddingModel": "嵌入模型",
"storageConfig": "存储配置", "storageConfig": "存储配置",
"kvStorage": "KV存储", "kvStorage": "KV存储",
@ -265,7 +265,11 @@
"graphStorage": "图存储", "graphStorage": "图存储",
"vectorStorage": "向量存储", "vectorStorage": "向量存储",
"workspace": "工作空间", "workspace": "工作空间",
"maxGraphNodes": "最大图节点数" "maxGraphNodes": "最大图节点数",
"rerankerConfig": "重排序配置",
"rerankerBindingHost": "重排序端点",
"rerankerModel": "重排序模型",
"lockStatus": "锁状态"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "编辑{{property}}", "editProperty": "编辑{{property}}",

View file

@ -252,12 +252,12 @@
"inputDirectory": "輸入目錄", "inputDirectory": "輸入目錄",
"llmConfig": "LLM 設定", "llmConfig": "LLM 設定",
"llmBinding": "LLM 綁定", "llmBinding": "LLM 綁定",
"llmBindingHost": "LLM 綁定主機", "llmBindingHost": "LLM 端點",
"llmModel": "LLM 模型", "llmModel": "LLM 模型",
"maxTokens": "最大權杖數", "maxTokens": "最大權杖數",
"embeddingConfig": "嵌入設定", "embeddingConfig": "嵌入設定",
"embeddingBinding": "嵌入綁定", "embeddingBinding": "嵌入綁定",
"embeddingBindingHost": "嵌入綁定主機", "embeddingBindingHost": "嵌入端點",
"embeddingModel": "嵌入模型", "embeddingModel": "嵌入模型",
"storageConfig": "儲存設定", "storageConfig": "儲存設定",
"kvStorage": "KV 儲存", "kvStorage": "KV 儲存",
@ -265,7 +265,11 @@
"graphStorage": "圖形儲存", "graphStorage": "圖形儲存",
"vectorStorage": "向量儲存", "vectorStorage": "向量儲存",
"workspace": "工作空間", "workspace": "工作空間",
"maxGraphNodes": "最大圖形節點數" "maxGraphNodes": "最大圖形節點數",
"rerankerConfig": "重排序設定",
"rerankerBindingHost": "重排序端點",
"rerankerModel": "重排序模型",
"lockStatus": "鎖定狀態"
}, },
"propertiesView": { "propertiesView": {
"editProperty": "編輯{{property}}", "editProperty": "編輯{{property}}",