From 4a19d0de251e3a33dabec4eb624733e31d189193 Mon Sep 17 00:00:00 2001 From: yangdx Date: Thu, 14 Aug 2025 22:58:26 +0800 Subject: [PATCH] Add chunk tracking system to monitor chunk sources and frequencies MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit • Track chunk sources (E/R/C types) • Log frequency and order metadata • Preserve chunk_id through processing • Add debug logging for chunk tracking • Handle rerank and truncation operations --- lightrag/operate.py | 65 +++++++++++++++++++++++++++++++++++++++++---- lightrag/utils.py | 26 ++++++++++++++++++ 2 files changed, 86 insertions(+), 5 deletions(-) diff --git a/lightrag/operate.py b/lightrag/operate.py index dafc9e84..d14b6919 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -2107,6 +2107,9 @@ async def _build_query_context( global_entities = [] global_relations = [] + # Track chunk sources and metadata for final logging + chunk_tracking = {} # chunk_id -> {source, frequency, order} + # Handle local and global modes if query_param.mode == "local": local_entities, local_relations = await _get_node_data( @@ -2145,6 +2148,15 @@ async def _build_query_context( chunks_vdb, query_param, ) + # Track vector chunks with source metadata + for i, chunk in enumerate(vector_chunks): + chunk_id = chunk.get("chunk_id") or chunk.get("id") + if chunk_id: + chunk_tracking[chunk_id] = { + "source": "C", + "frequency": 1, # Vector chunks always have frequency 1 + "order": i + 1, # 1-based order in vector search results + } # Use round-robin merge to combine local and global data fairly final_entities = [] @@ -2342,6 +2354,7 @@ async def _build_query_context( seen_edges.add(pair) # Get text chunks based on final filtered data + logger.info(f"chunk_tracking: {chunk_tracking}") if final_node_datas: entity_chunks = await _find_related_text_unit_from_entities( final_node_datas, @@ -2350,8 +2363,9 @@ async def _build_query_context( knowledge_graph_inst, query, chunks_vdb, + chunk_tracking=chunk_tracking, ) - + logger.info(f"chunk_tracking: {chunk_tracking}") if final_edge_datas: relation_chunks = await _find_related_text_unit_from_relations( final_edge_datas, @@ -2360,7 +2374,9 @@ async def _build_query_context( entity_chunks, query, chunks_vdb, + chunk_tracking=chunk_tracking, ) + logger.info(f"chunk_tracking: {chunk_tracking}") # Round-robin merge chunks from different sources with deduplication by chunk_id merged_chunks = [] @@ -2379,6 +2395,7 @@ async def _build_query_context( { "content": chunk["content"], "file_path": chunk.get("file_path", "unknown_source"), + "chunk_id": chunk_id, } ) @@ -2392,6 +2409,7 @@ async def _build_query_context( { "content": chunk["content"], "file_path": chunk.get("file_path", "unknown_source"), + "chunk_id": chunk_id, } ) @@ -2405,6 +2423,7 @@ async def _build_query_context( { "content": chunk["content"], "file_path": chunk.get("file_path", "unknown_source"), + "chunk_id": chunk_id, } ) @@ -2523,6 +2542,24 @@ async def _build_query_context( if not entities_context and not relations_context: return None + # output chunks tracking infomations + # format: / (e.g., E5/2 R2/1 C1/1) + if truncated_chunks and chunk_tracking: + chunk_tracking_log = [] + for chunk in truncated_chunks: + chunk_id = chunk.get("chunk_id") + if chunk_id and chunk_id in chunk_tracking: + tracking_info = chunk_tracking[chunk_id] + source = tracking_info["source"] + frequency = tracking_info["frequency"] + order = tracking_info["order"] + chunk_tracking_log.append(f"{source}{frequency}/{order}") + else: + chunk_tracking_log.append("?0/0") + + if chunk_tracking_log: + logger.info(f"chunks: {' '.join(chunk_tracking_log)}") + entities_str = json.dumps(entities_context, ensure_ascii=False) relations_str = json.dumps(relations_context, ensure_ascii=False) text_units_str = json.dumps(text_units_context, ensure_ascii=False) @@ -2672,6 +2709,7 @@ async def _find_related_text_unit_from_entities( knowledge_graph_inst: BaseGraphStorage, query: str = None, chunks_vdb: BaseVectorStorage = None, + chunk_tracking: dict = None, ): """ Find text chunks related to entities using configurable chunk selection method. @@ -2801,15 +2839,23 @@ async def _find_related_text_unit_from_entities( ) # Remove duplicates while preserving order chunk_data_list = await text_chunks_db.get_by_ids(unique_chunk_ids) - # Step 6: Build result chunks with valid data + # Step 6: Build result chunks with valid data and update chunk tracking result_chunks = [] - for chunk_id, chunk_data in zip(unique_chunk_ids, chunk_data_list): + for i, (chunk_id, chunk_data) in enumerate(zip(unique_chunk_ids, chunk_data_list)): if chunk_data is not None and "content" in chunk_data: chunk_data_copy = chunk_data.copy() chunk_data_copy["source_type"] = "entity" chunk_data_copy["chunk_id"] = chunk_id # Add chunk_id for deduplication result_chunks.append(chunk_data_copy) + # Update chunk tracking if provided + if chunk_tracking is not None: + chunk_tracking[chunk_id] = { + "source": "E", + "frequency": chunk_occurrence_count.get(chunk_id, 1), + "order": i + 1, # 1-based order in final entity-related results + } + return result_chunks @@ -2911,6 +2957,7 @@ async def _find_related_text_unit_from_relations( entity_chunks: list[dict] = None, query: str = None, chunks_vdb: BaseVectorStorage = None, + chunk_tracking: dict = None, ): """ Find text chunks related to relationships using configurable chunk selection method. @@ -3087,15 +3134,23 @@ async def _find_related_text_unit_from_relations( ) # Remove duplicates while preserving order chunk_data_list = await text_chunks_db.get_by_ids(unique_chunk_ids) - # Step 6: Build result chunks with valid data + # Step 6: Build result chunks with valid data and update chunk tracking result_chunks = [] - for chunk_id, chunk_data in zip(unique_chunk_ids, chunk_data_list): + for i, (chunk_id, chunk_data) in enumerate(zip(unique_chunk_ids, chunk_data_list)): if chunk_data is not None and "content" in chunk_data: chunk_data_copy = chunk_data.copy() chunk_data_copy["source_type"] = "relationship" chunk_data_copy["chunk_id"] = chunk_id # Add chunk_id for deduplication result_chunks.append(chunk_data_copy) + # Update chunk tracking if provided + if chunk_tracking is not None: + chunk_tracking[chunk_id] = { + "source": "R", + "frequency": chunk_occurrence_count.get(chunk_id, 1), + "order": i + 1, # 1-based order in final relation-related results + } + return result_chunks diff --git a/lightrag/utils.py b/lightrag/utils.py index 4d2a5289..340e4251 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -1901,6 +1901,13 @@ async def process_chunks_unified( # 1. Apply reranking if enabled and query is provided if query_param.enable_rerank and query and unique_chunks: + # 保存 chunk_id 字段,因为 rerank 可能会丢失这个字段 + chunk_ids = {} + for chunk in unique_chunks: + chunk_id = chunk.get("chunk_id") + if chunk_id: + chunk_ids[id(chunk)] = chunk_id + rerank_top_k = query_param.chunk_top_k or len(unique_chunks) unique_chunks = await apply_rerank_if_enabled( query=query, @@ -1910,6 +1917,11 @@ async def process_chunks_unified( top_n=rerank_top_k, ) + # 恢复 chunk_id 字段 + for chunk in unique_chunks: + if id(chunk) in chunk_ids: + chunk["chunk_id"] = chunk_ids[id(chunk)] + # 2. Filter by minimum rerank score if reranking is enabled if query_param.enable_rerank and unique_chunks: min_rerank_score = global_config.get("min_rerank_score", 0.5) @@ -1956,12 +1968,26 @@ async def process_chunks_unified( ) original_count = len(unique_chunks) + + # Keep chunk_id field, cause truncate_list_by_token_size will lose it + chunk_ids_map = {} + for i, chunk in enumerate(unique_chunks): + chunk_id = chunk.get("chunk_id") + if chunk_id: + chunk_ids_map[i] = chunk_id + unique_chunks = truncate_list_by_token_size( unique_chunks, key=lambda x: x.get("content", ""), max_token_size=chunk_token_limit, tokenizer=tokenizer, ) + + # restore chunk_id feiled + for i, chunk in enumerate(unique_chunks): + if i in chunk_ids_map: + chunk["chunk_id"] = chunk_ids_map[i] + logger.debug( f"Token truncation: {len(unique_chunks)} chunks from {original_count} " f"(chunk available tokens: {chunk_token_limit}, source: {source_type})"