Optimize async task limits for graph processing
- Increased concurrency for graph operations - Renamed variables for clarity - Updated status messages
This commit is contained in:
parent
ab805b35c4
commit
f185b3fb38
1 changed files with 10 additions and 10 deletions
|
|
@ -374,8 +374,8 @@ async def _rebuild_knowledge_from_chunks(
|
||||||
continue
|
continue
|
||||||
|
|
||||||
# Get max async tasks limit from global_config for semaphore control
|
# Get max async tasks limit from global_config for semaphore control
|
||||||
llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1
|
graph_max_async = global_config.get("llm_model_max_async", 4) * 2
|
||||||
semaphore = asyncio.Semaphore(llm_model_max_async)
|
semaphore = asyncio.Semaphore(graph_max_async)
|
||||||
|
|
||||||
# Counters for tracking progress
|
# Counters for tracking progress
|
||||||
rebuilt_entities_count = 0
|
rebuilt_entities_count = 0
|
||||||
|
|
@ -468,7 +468,7 @@ async def _rebuild_knowledge_from_chunks(
|
||||||
tasks.append(task)
|
tasks.append(task)
|
||||||
|
|
||||||
# Log parallel processing start
|
# Log parallel processing start
|
||||||
status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (max concurrent: {llm_model_max_async})"
|
status_message = f"Starting parallel rebuild of {len(entities_to_rebuild)} entities and {len(relationships_to_rebuild)} relationships (async: {graph_max_async})"
|
||||||
logger.info(status_message)
|
logger.info(status_message)
|
||||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
|
|
@ -1200,17 +1200,17 @@ async def merge_nodes_and_edges(
|
||||||
pipeline_status["latest_message"] = log_message
|
pipeline_status["latest_message"] = log_message
|
||||||
pipeline_status["history_messages"].append(log_message)
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
|
# Get max async tasks limit from global_config for semaphore control
|
||||||
|
graph_max_async = global_config.get("llm_model_max_async", 4) * 2
|
||||||
|
semaphore = asyncio.Semaphore(graph_max_async)
|
||||||
|
|
||||||
# Process and update all entities and relationships in parallel
|
# Process and update all entities and relationships in parallel
|
||||||
log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations"
|
log_message = f"Processing: {total_entities_count} entities and {total_relations_count} relations (async: {graph_max_async})"
|
||||||
logger.info(log_message)
|
logger.info(log_message)
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["latest_message"] = log_message
|
pipeline_status["latest_message"] = log_message
|
||||||
pipeline_status["history_messages"].append(log_message)
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
# Get max async tasks limit from global_config for semaphore control
|
|
||||||
llm_model_max_async = global_config.get("llm_model_max_async", 4) + 1
|
|
||||||
semaphore = asyncio.Semaphore(llm_model_max_async)
|
|
||||||
|
|
||||||
async def _locked_process_entity_name(entity_name, entities):
|
async def _locked_process_entity_name(entity_name, entities):
|
||||||
async with semaphore:
|
async with semaphore:
|
||||||
workspace = global_config.get("workspace", "")
|
workspace = global_config.get("workspace", "")
|
||||||
|
|
@ -1502,8 +1502,8 @@ async def extract_entities(
|
||||||
return maybe_nodes, maybe_edges
|
return maybe_nodes, maybe_edges
|
||||||
|
|
||||||
# Get max async tasks limit from global_config
|
# Get max async tasks limit from global_config
|
||||||
llm_model_max_async = global_config.get("llm_model_max_async", 4)
|
chunk_max_async = global_config.get("llm_model_max_async", 4)
|
||||||
semaphore = asyncio.Semaphore(llm_model_max_async)
|
semaphore = asyncio.Semaphore(chunk_max_async)
|
||||||
|
|
||||||
async def _process_with_semaphore(chunk):
|
async def _process_with_semaphore(chunk):
|
||||||
async with semaphore:
|
async with semaphore:
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue