Add chunk tracking system to monitor chunk sources and frequencies

• Track chunk sources (E/R/C types)
• Log frequency and order metadata
• Preserve chunk_id through processing
• Add debug logging for chunk tracking
• Handle rerank and truncation operations
This commit is contained in:
yangdx 2025-08-14 22:58:26 +08:00
parent a8b7890470
commit 4a19d0de25
2 changed files with 86 additions and 5 deletions

View file

@ -2107,6 +2107,9 @@ async def _build_query_context(
global_entities = []
global_relations = []
# Track chunk sources and metadata for final logging
chunk_tracking = {} # chunk_id -> {source, frequency, order}
# Handle local and global modes
if query_param.mode == "local":
local_entities, local_relations = await _get_node_data(
@ -2145,6 +2148,15 @@ async def _build_query_context(
chunks_vdb,
query_param,
)
# Track vector chunks with source metadata
for i, chunk in enumerate(vector_chunks):
chunk_id = chunk.get("chunk_id") or chunk.get("id")
if chunk_id:
chunk_tracking[chunk_id] = {
"source": "C",
"frequency": 1, # Vector chunks always have frequency 1
"order": i + 1, # 1-based order in vector search results
}
# Use round-robin merge to combine local and global data fairly
final_entities = []
@ -2342,6 +2354,7 @@ async def _build_query_context(
seen_edges.add(pair)
# Get text chunks based on final filtered data
logger.info(f"chunk_tracking: {chunk_tracking}")
if final_node_datas:
entity_chunks = await _find_related_text_unit_from_entities(
final_node_datas,
@ -2350,8 +2363,9 @@ async def _build_query_context(
knowledge_graph_inst,
query,
chunks_vdb,
chunk_tracking=chunk_tracking,
)
logger.info(f"chunk_tracking: {chunk_tracking}")
if final_edge_datas:
relation_chunks = await _find_related_text_unit_from_relations(
final_edge_datas,
@ -2360,7 +2374,9 @@ async def _build_query_context(
entity_chunks,
query,
chunks_vdb,
chunk_tracking=chunk_tracking,
)
logger.info(f"chunk_tracking: {chunk_tracking}")
# Round-robin merge chunks from different sources with deduplication by chunk_id
merged_chunks = []
@ -2379,6 +2395,7 @@ async def _build_query_context(
{
"content": chunk["content"],
"file_path": chunk.get("file_path", "unknown_source"),
"chunk_id": chunk_id,
}
)
@ -2392,6 +2409,7 @@ async def _build_query_context(
{
"content": chunk["content"],
"file_path": chunk.get("file_path", "unknown_source"),
"chunk_id": chunk_id,
}
)
@ -2405,6 +2423,7 @@ async def _build_query_context(
{
"content": chunk["content"],
"file_path": chunk.get("file_path", "unknown_source"),
"chunk_id": chunk_id,
}
)
@ -2523,6 +2542,24 @@ async def _build_query_context(
if not entities_context and not relations_context:
return None
# output chunks tracking infomations
# format: <source><frequency>/<order> (e.g., E5/2 R2/1 C1/1)
if truncated_chunks and chunk_tracking:
chunk_tracking_log = []
for chunk in truncated_chunks:
chunk_id = chunk.get("chunk_id")
if chunk_id and chunk_id in chunk_tracking:
tracking_info = chunk_tracking[chunk_id]
source = tracking_info["source"]
frequency = tracking_info["frequency"]
order = tracking_info["order"]
chunk_tracking_log.append(f"{source}{frequency}/{order}")
else:
chunk_tracking_log.append("?0/0")
if chunk_tracking_log:
logger.info(f"chunks: {' '.join(chunk_tracking_log)}")
entities_str = json.dumps(entities_context, ensure_ascii=False)
relations_str = json.dumps(relations_context, ensure_ascii=False)
text_units_str = json.dumps(text_units_context, ensure_ascii=False)
@ -2672,6 +2709,7 @@ async def _find_related_text_unit_from_entities(
knowledge_graph_inst: BaseGraphStorage,
query: str = None,
chunks_vdb: BaseVectorStorage = None,
chunk_tracking: dict = None,
):
"""
Find text chunks related to entities using configurable chunk selection method.
@ -2801,15 +2839,23 @@ async def _find_related_text_unit_from_entities(
) # Remove duplicates while preserving order
chunk_data_list = await text_chunks_db.get_by_ids(unique_chunk_ids)
# Step 6: Build result chunks with valid data
# Step 6: Build result chunks with valid data and update chunk tracking
result_chunks = []
for chunk_id, chunk_data in zip(unique_chunk_ids, chunk_data_list):
for i, (chunk_id, chunk_data) in enumerate(zip(unique_chunk_ids, chunk_data_list)):
if chunk_data is not None and "content" in chunk_data:
chunk_data_copy = chunk_data.copy()
chunk_data_copy["source_type"] = "entity"
chunk_data_copy["chunk_id"] = chunk_id # Add chunk_id for deduplication
result_chunks.append(chunk_data_copy)
# Update chunk tracking if provided
if chunk_tracking is not None:
chunk_tracking[chunk_id] = {
"source": "E",
"frequency": chunk_occurrence_count.get(chunk_id, 1),
"order": i + 1, # 1-based order in final entity-related results
}
return result_chunks
@ -2911,6 +2957,7 @@ async def _find_related_text_unit_from_relations(
entity_chunks: list[dict] = None,
query: str = None,
chunks_vdb: BaseVectorStorage = None,
chunk_tracking: dict = None,
):
"""
Find text chunks related to relationships using configurable chunk selection method.
@ -3087,15 +3134,23 @@ async def _find_related_text_unit_from_relations(
) # Remove duplicates while preserving order
chunk_data_list = await text_chunks_db.get_by_ids(unique_chunk_ids)
# Step 6: Build result chunks with valid data
# Step 6: Build result chunks with valid data and update chunk tracking
result_chunks = []
for chunk_id, chunk_data in zip(unique_chunk_ids, chunk_data_list):
for i, (chunk_id, chunk_data) in enumerate(zip(unique_chunk_ids, chunk_data_list)):
if chunk_data is not None and "content" in chunk_data:
chunk_data_copy = chunk_data.copy()
chunk_data_copy["source_type"] = "relationship"
chunk_data_copy["chunk_id"] = chunk_id # Add chunk_id for deduplication
result_chunks.append(chunk_data_copy)
# Update chunk tracking if provided
if chunk_tracking is not None:
chunk_tracking[chunk_id] = {
"source": "R",
"frequency": chunk_occurrence_count.get(chunk_id, 1),
"order": i + 1, # 1-based order in final relation-related results
}
return result_chunks

View file

@ -1901,6 +1901,13 @@ async def process_chunks_unified(
# 1. Apply reranking if enabled and query is provided
if query_param.enable_rerank and query and unique_chunks:
# 保存 chunk_id 字段,因为 rerank 可能会丢失这个字段
chunk_ids = {}
for chunk in unique_chunks:
chunk_id = chunk.get("chunk_id")
if chunk_id:
chunk_ids[id(chunk)] = chunk_id
rerank_top_k = query_param.chunk_top_k or len(unique_chunks)
unique_chunks = await apply_rerank_if_enabled(
query=query,
@ -1910,6 +1917,11 @@ async def process_chunks_unified(
top_n=rerank_top_k,
)
# 恢复 chunk_id 字段
for chunk in unique_chunks:
if id(chunk) in chunk_ids:
chunk["chunk_id"] = chunk_ids[id(chunk)]
# 2. Filter by minimum rerank score if reranking is enabled
if query_param.enable_rerank and unique_chunks:
min_rerank_score = global_config.get("min_rerank_score", 0.5)
@ -1956,12 +1968,26 @@ async def process_chunks_unified(
)
original_count = len(unique_chunks)
# Keep chunk_id field, cause truncate_list_by_token_size will lose it
chunk_ids_map = {}
for i, chunk in enumerate(unique_chunks):
chunk_id = chunk.get("chunk_id")
if chunk_id:
chunk_ids_map[i] = chunk_id
unique_chunks = truncate_list_by_token_size(
unique_chunks,
key=lambda x: x.get("content", ""),
max_token_size=chunk_token_limit,
tokenizer=tokenizer,
)
# restore chunk_id feiled
for i, chunk in enumerate(unique_chunks):
if i in chunk_ids_map:
chunk["chunk_id"] = chunk_ids_map[i]
logger.debug(
f"Token truncation: {len(unique_chunks)} chunks from {original_count} "
f"(chunk available tokens: {chunk_token_limit}, source: {source_type})"