Merge branch 'context-builder'

This commit is contained in:
yangdx 2025-07-23 16:14:44 +08:00
commit 00d7bc80bf

View file

@ -1329,6 +1329,16 @@ async def merge_nodes_and_edges(
for edge_key, edges in all_edges.items(): for edge_key, edges in all_edges.items():
tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges))) tasks.append(asyncio.create_task(_locked_process_edges(edge_key, edges)))
# Check if there are any tasks to process
if not tasks:
log_message = f"No entities or relationships to process for {file_path}"
logger.info(log_message)
if pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return
# Execute all tasks in parallel with semaphore control and early failure detection # Execute all tasks in parallel with semaphore control and early failure detection
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION) done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_EXCEPTION)
@ -2020,74 +2030,73 @@ async def _build_query_context(
# Unified token control system - Apply precise token limits to entities and relations # Unified token control system - Apply precise token limits to entities and relations
tokenizer = text_chunks_db.global_config.get("tokenizer") tokenizer = text_chunks_db.global_config.get("tokenizer")
if tokenizer: # Get new token limits from query_param (with fallback to global_config)
# Get new token limits from query_param (with fallback to global_config) max_entity_tokens = getattr(
max_entity_tokens = getattr( query_param,
query_param, "max_entity_tokens",
"max_entity_tokens", text_chunks_db.global_config.get(
text_chunks_db.global_config.get( "max_entity_tokens", DEFAULT_MAX_ENTITY_TOKENS
"max_entity_tokens", DEFAULT_MAX_ENTITY_TOKENS ),
), )
) max_relation_tokens = getattr(
max_relation_tokens = getattr( query_param,
query_param, "max_relation_tokens",
"max_relation_tokens", text_chunks_db.global_config.get(
text_chunks_db.global_config.get( "max_relation_tokens", DEFAULT_MAX_RELATION_TOKENS
"max_relation_tokens", DEFAULT_MAX_RELATION_TOKENS ),
), )
) max_total_tokens = getattr(
max_total_tokens = getattr( query_param,
query_param, "max_total_tokens",
"max_total_tokens", text_chunks_db.global_config.get(
text_chunks_db.global_config.get( "max_total_tokens", DEFAULT_MAX_TOTAL_TOKENS
"max_total_tokens", DEFAULT_MAX_TOTAL_TOKENS ),
), )
)
# Truncate entities based on complete JSON serialization # Truncate entities based on complete JSON serialization
if entities_context: if entities_context:
original_entity_count = len(entities_context) original_entity_count = len(entities_context)
# Process entities context to replace GRAPH_FIELD_SEP with : in file_path fields # Process entities context to replace GRAPH_FIELD_SEP with : in file_path fields
for entity in entities_context: for entity in entities_context:
if "file_path" in entity and entity["file_path"]: if "file_path" in entity and entity["file_path"]:
entity["file_path"] = entity["file_path"].replace( entity["file_path"] = entity["file_path"].replace(
GRAPH_FIELD_SEP, ";" GRAPH_FIELD_SEP, ";"
)
entities_context = truncate_list_by_token_size(
entities_context,
key=lambda x: json.dumps(x, ensure_ascii=False),
max_token_size=max_entity_tokens,
tokenizer=tokenizer,
)
if len(entities_context) < original_entity_count:
logger.debug(
f"Truncated entities: {original_entity_count} -> {len(entities_context)} (entity max tokens: {max_entity_tokens})"
) )
# Truncate relations based on complete JSON serialization entities_context = truncate_list_by_token_size(
if relations_context: entities_context,
original_relation_count = len(relations_context) key=lambda x: json.dumps(x, ensure_ascii=False),
max_token_size=max_entity_tokens,
# Process relations context to replace GRAPH_FIELD_SEP with : in file_path fields tokenizer=tokenizer,
for relation in relations_context: )
if "file_path" in relation and relation["file_path"]: if len(entities_context) < original_entity_count:
relation["file_path"] = relation["file_path"].replace( logger.debug(
GRAPH_FIELD_SEP, ";" f"Truncated entities: {original_entity_count} -> {len(entities_context)} (entity max tokens: {max_entity_tokens})"
)
relations_context = truncate_list_by_token_size(
relations_context,
key=lambda x: json.dumps(x, ensure_ascii=False),
max_token_size=max_relation_tokens,
tokenizer=tokenizer,
) )
if len(relations_context) < original_relation_count:
logger.debug( # Truncate relations based on complete JSON serialization
f"Truncated relations: {original_relation_count} -> {len(relations_context)} (relation max tokens: {max_relation_tokens})" if relations_context:
original_relation_count = len(relations_context)
# Process relations context to replace GRAPH_FIELD_SEP with : in file_path fields
for relation in relations_context:
if "file_path" in relation and relation["file_path"]:
relation["file_path"] = relation["file_path"].replace(
GRAPH_FIELD_SEP, ";"
) )
relations_context = truncate_list_by_token_size(
relations_context,
key=lambda x: json.dumps(x, ensure_ascii=False),
max_token_size=max_relation_tokens,
tokenizer=tokenizer,
)
if len(relations_context) < original_relation_count:
logger.debug(
f"Truncated relations: {original_relation_count} -> {len(relations_context)} (relation max tokens: {max_relation_tokens})"
)
# After truncation, get text chunks based on final entities and relations # After truncation, get text chunks based on final entities and relations
logger.info("Getting text chunks based on truncated entities and relations...") logger.info("Getting text chunks based on truncated entities and relations...")
@ -2145,9 +2154,9 @@ async def _build_query_context(
if chunks: if chunks:
all_chunks.extend(chunks) all_chunks.extend(chunks)
# Apply token processing to chunks if tokenizer is available # Apply token processing to chunks
text_units_context = [] text_units_context = []
if tokenizer and all_chunks: if all_chunks:
# Calculate dynamic token limit for text chunks # Calculate dynamic token limit for text chunks
entities_str = json.dumps(entities_context, ensure_ascii=False) entities_str = json.dumps(entities_context, ensure_ascii=False)
relations_str = json.dumps(relations_context, ensure_ascii=False) relations_str = json.dumps(relations_context, ensure_ascii=False)
@ -2600,7 +2609,7 @@ async def _get_edge_data(
combined = { combined = {
"src_id": k["src_id"], "src_id": k["src_id"],
"tgt_id": k["tgt_id"], "tgt_id": k["tgt_id"],
"rank": edge_degrees_dict.get(pair, k.get("rank", 0)), "rank": edge_degrees_dict.get(pair, 0),
"created_at": k.get("created_at", None), "created_at": k.get("created_at", None),
**edge_props, **edge_props,
} }