Combined entity and edge processing tasks and optimize merging with semaphore
This commit is contained in:
parent
207f0a7f2a
commit
ef4870fda5
2 changed files with 72 additions and 67 deletions
|
|
@ -1078,7 +1078,6 @@ class LightRAG:
|
||||||
# Semphore is released here
|
# Semphore is released here
|
||||||
# Concurrency is controlled by graph db lock for individual entities and relationships
|
# Concurrency is controlled by graph db lock for individual entities and relationships
|
||||||
|
|
||||||
|
|
||||||
if file_extraction_stage_ok:
|
if file_extraction_stage_ok:
|
||||||
try:
|
try:
|
||||||
# Get chunk_results from entity_relation_task
|
# Get chunk_results from entity_relation_task
|
||||||
|
|
|
||||||
|
|
@ -1016,7 +1016,7 @@ async def _merge_edges_then_upsert(
|
||||||
)
|
)
|
||||||
|
|
||||||
for need_insert_id in [src_id, tgt_id]:
|
for need_insert_id in [src_id, tgt_id]:
|
||||||
if (await knowledge_graph_inst.has_node(need_insert_id)):
|
if await knowledge_graph_inst.has_node(need_insert_id):
|
||||||
# This is so that the initial check for the existence of the node need not be locked
|
# This is so that the initial check for the existence of the node need not be locked
|
||||||
continue
|
continue
|
||||||
async with get_graph_db_lock_keyed([need_insert_id], enable_logging=False):
|
async with get_graph_db_lock_keyed([need_insert_id], enable_logging=False):
|
||||||
|
|
@ -1124,7 +1124,6 @@ async def merge_nodes_and_edges(
|
||||||
llm_response_cache: LLM response cache
|
llm_response_cache: LLM response cache
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
# Collect all nodes and edges from all chunks
|
# Collect all nodes and edges from all chunks
|
||||||
all_nodes = defaultdict(list)
|
all_nodes = defaultdict(list)
|
||||||
all_edges = defaultdict(list)
|
all_edges = defaultdict(list)
|
||||||
|
|
@ -1145,92 +1144,99 @@ async def merge_nodes_and_edges(
|
||||||
|
|
||||||
# Merge nodes and edges
|
# Merge nodes and edges
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
log_message = (
|
log_message = f"Merging stage {current_file_number}/{total_files}: {file_path}"
|
||||||
f"Merging stage {current_file_number}/{total_files}: {file_path}"
|
|
||||||
)
|
|
||||||
logger.info(log_message)
|
logger.info(log_message)
|
||||||
pipeline_status["latest_message"] = log_message
|
pipeline_status["latest_message"] = log_message
|
||||||
pipeline_status["history_messages"].append(log_message)
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
# Process and update all entities at once
|
# Process and update all entities and relationships in parallel
|
||||||
log_message = f"Updating {total_entities_count} entities {current_file_number}/{total_files}: {file_path}"
|
log_message = f"Updating {total_entities_count} entities and {total_relations_count} relations {current_file_number}/{total_files}: {file_path}"
|
||||||
logger.info(log_message)
|
logger.info(log_message)
|
||||||
if pipeline_status is not None:
|
if pipeline_status is not None:
|
||||||
async with pipeline_status_lock:
|
async with pipeline_status_lock:
|
||||||
pipeline_status["latest_message"] = log_message
|
pipeline_status["latest_message"] = log_message
|
||||||
pipeline_status["history_messages"].append(log_message)
|
pipeline_status["history_messages"].append(log_message)
|
||||||
|
|
||||||
|
# Get max async tasks limit from global_config for semaphore control
|
||||||
|
llm_model_max_async = global_config.get("llm_model_max_async", 4)
|
||||||
|
semaphore = asyncio.Semaphore(llm_model_max_async)
|
||||||
|
|
||||||
async def _locked_process_entity_name(entity_name, entities):
|
async def _locked_process_entity_name(entity_name, entities):
|
||||||
async with get_graph_db_lock_keyed([entity_name], enable_logging=False):
|
async with semaphore:
|
||||||
entity_data = await _merge_nodes_then_upsert(
|
async with get_graph_db_lock_keyed([entity_name], enable_logging=False):
|
||||||
entity_name,
|
entity_data = await _merge_nodes_then_upsert(
|
||||||
entities,
|
entity_name,
|
||||||
knowledge_graph_inst,
|
entities,
|
||||||
global_config,
|
knowledge_graph_inst,
|
||||||
pipeline_status,
|
global_config,
|
||||||
pipeline_status_lock,
|
pipeline_status,
|
||||||
llm_response_cache,
|
pipeline_status_lock,
|
||||||
)
|
llm_response_cache,
|
||||||
if entity_vdb is not None:
|
)
|
||||||
data_for_vdb = {
|
if entity_vdb is not None:
|
||||||
compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): {
|
data_for_vdb = {
|
||||||
"entity_name": entity_data["entity_name"],
|
compute_mdhash_id(entity_data["entity_name"], prefix="ent-"): {
|
||||||
"entity_type": entity_data["entity_type"],
|
"entity_name": entity_data["entity_name"],
|
||||||
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
|
"entity_type": entity_data["entity_type"],
|
||||||
"source_id": entity_data["source_id"],
|
"content": f"{entity_data['entity_name']}\n{entity_data['description']}",
|
||||||
"file_path": entity_data.get("file_path", "unknown_source"),
|
"source_id": entity_data["source_id"],
|
||||||
|
"file_path": entity_data.get("file_path", "unknown_source"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
await entity_vdb.upsert(data_for_vdb)
|
||||||
await entity_vdb.upsert(data_for_vdb)
|
return entity_data
|
||||||
return entity_data
|
|
||||||
|
|
||||||
tasks = []
|
|
||||||
for entity_name, entities in all_nodes.items():
|
|
||||||
tasks.append(asyncio.create_task(_locked_process_entity_name(entity_name, entities)))
|
|
||||||
await asyncio.gather(*tasks)
|
|
||||||
|
|
||||||
# Process and update all relationships at once
|
|
||||||
log_message = f"Updating {total_relations_count} relations {current_file_number}/{total_files}: {file_path}"
|
|
||||||
logger.info(log_message)
|
|
||||||
if pipeline_status is not None:
|
|
||||||
async with pipeline_status_lock:
|
|
||||||
pipeline_status["latest_message"] = log_message
|
|
||||||
pipeline_status["history_messages"].append(log_message)
|
|
||||||
|
|
||||||
async def _locked_process_edges(edge_key, edges):
|
async def _locked_process_edges(edge_key, edges):
|
||||||
async with get_graph_db_lock_keyed(f"{edge_key[0]}-{edge_key[1]}", enable_logging=False):
|
async with semaphore:
|
||||||
edge_data = await _merge_edges_then_upsert(
|
async with get_graph_db_lock_keyed(
|
||||||
edge_key[0],
|
f"{edge_key[0]}-{edge_key[1]}", enable_logging=False
|
||||||
edge_key[1],
|
):
|
||||||
edges,
|
edge_data = await _merge_edges_then_upsert(
|
||||||
knowledge_graph_inst,
|
edge_key[0],
|
||||||
global_config,
|
edge_key[1],
|
||||||
pipeline_status,
|
edges,
|
||||||
pipeline_status_lock,
|
knowledge_graph_inst,
|
||||||
llm_response_cache,
|
global_config,
|
||||||
)
|
pipeline_status,
|
||||||
if edge_data is None:
|
pipeline_status_lock,
|
||||||
return None
|
llm_response_cache,
|
||||||
|
)
|
||||||
|
if edge_data is None:
|
||||||
|
return None
|
||||||
|
|
||||||
if relationships_vdb is not None:
|
if relationships_vdb is not None:
|
||||||
data_for_vdb = {
|
data_for_vdb = {
|
||||||
compute_mdhash_id(edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"): {
|
compute_mdhash_id(
|
||||||
"src_id": edge_data["src_id"],
|
edge_data["src_id"] + edge_data["tgt_id"], prefix="rel-"
|
||||||
"tgt_id": edge_data["tgt_id"],
|
): {
|
||||||
"keywords": edge_data["keywords"],
|
"src_id": edge_data["src_id"],
|
||||||
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
|
"tgt_id": edge_data["tgt_id"],
|
||||||
"source_id": edge_data["source_id"],
|
"keywords": edge_data["keywords"],
|
||||||
"file_path": edge_data.get("file_path", "unknown_source"),
|
"content": f"{edge_data['src_id']}\t{edge_data['tgt_id']}\n{edge_data['keywords']}\n{edge_data['description']}",
|
||||||
|
"source_id": edge_data["source_id"],
|
||||||
|
"file_path": edge_data.get("file_path", "unknown_source"),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
await relationships_vdb.upsert(data_for_vdb)
|
||||||
await relationships_vdb.upsert(data_for_vdb)
|
return edge_data
|
||||||
return edge_data
|
|
||||||
|
|
||||||
|
# Create a single task queue for both entities and edges
|
||||||
tasks = []
|
tasks = []
|
||||||
|
|
||||||
|
# Add entity processing tasks
|
||||||
|
for entity_name, entities in all_nodes.items():
|
||||||
|
tasks.append(
|
||||||
|
asyncio.create_task(_locked_process_entity_name(entity_name, entities))
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add edge processing tasks
|
||||||
for edge_key, edges in all_edges.items():
|
for edge_key, edges in all_edges.items():
|
||||||
tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges)))
|
tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges)))
|
||||||
|
|
||||||
|
# Execute all tasks in parallel with semaphore control
|
||||||
await asyncio.gather(*tasks)
|
await asyncio.gather(*tasks)
|
||||||
|
|
||||||
|
|
||||||
async def extract_entities(
|
async def extract_entities(
|
||||||
chunks: dict[str, TextChunkSchema],
|
chunks: dict[str, TextChunkSchema],
|
||||||
global_config: dict[str, str],
|
global_config: dict[str, str],
|
||||||
|
|
|
||||||
Loading…
Add table
Reference in a new issue