fix: Sort cached extraction results by the create_time within each chunk

This ensures the KG rebuilds maintain the original creation order of the first extraction result for each chunk.
This commit is contained in:
yangdx 2025-08-25 21:41:33 +08:00
parent 882d6857d8
commit 15cdd0dd8f

View file

@ -630,14 +630,20 @@ async def _get_cached_extraction_results(
) -> dict[str, list[str]]:
"""Get cached extraction results for specific chunk IDs
This function retrieves cached LLM extraction results for the given chunk IDs and returns
them sorted by creation time. The results are sorted at two levels:
1. Individual extraction results within each chunk are sorted by create_time (earliest first)
2. Chunks themselves are sorted by the create_time of their earliest extraction result
Args:
llm_response_cache: LLM response cache storage
chunk_ids: Set of chunk IDs to get cached results for
text_chunks_data: Pre-loaded chunk data (optional, for performance)
text_chunks_storage: Text chunks storage (fallback if text_chunks_data is None)
text_chunks_storage: Text chunks storage for retrieving chunk data and LLM cache references
Returns:
Dict mapping chunk_id -> list of extraction_result_text
Dict mapping chunk_id -> list of extraction_result_text, where:
- Keys (chunk_ids) are ordered by the create_time of their first extraction result
- Values (extraction results) are ordered by create_time within each chunk
"""
cached_results = {}
@ -646,15 +652,13 @@ async def _get_cached_extraction_results(
# Read from storage
chunk_data_list = await text_chunks_storage.get_by_ids(list(chunk_ids))
for chunk_id, chunk_data in zip(chunk_ids, chunk_data_list):
for chunk_data in chunk_data_list:
if chunk_data and isinstance(chunk_data, dict):
llm_cache_list = chunk_data.get("llm_cache_list", [])
if llm_cache_list:
all_cache_ids.update(llm_cache_list)
else:
logger.warning(
f"Chunk {chunk_id} data is invalid or None: {type(chunk_data)}"
)
logger.warning(f"Chunk data is invalid or None: {chunk_data}")
if not all_cache_ids:
logger.warning(f"No LLM cache IDs found for {len(chunk_ids)} chunk IDs")
@ -665,7 +669,7 @@ async def _get_cached_extraction_results(
# Process cache entries and group by chunk_id
valid_entries = 0
for cache_id, cache_entry in zip(all_cache_ids, cache_data_list):
for cache_entry in cache_data_list:
if (
cache_entry is not None
and isinstance(cache_entry, dict)
@ -685,16 +689,30 @@ async def _get_cached_extraction_results(
# Store tuple with extraction result and creation time for sorting
cached_results[chunk_id].append((extraction_result, create_time))
# Sort extraction results by create_time for each chunk
# Sort extraction results by create_time for each chunk and collect earliest times
chunk_earliest_times = {}
for chunk_id in cached_results:
# Sort by create_time (x[1]), then extract only extraction_result (x[0])
cached_results[chunk_id].sort(key=lambda x: x[1])
# Store the earliest create_time for this chunk (first item after sorting)
chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1]
# Extract only extraction_result (x[0])
cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]]
logger.info(
f"Found {valid_entries} valid cache entries, {len(cached_results)} chunks with results"
# Sort cached_results by the earliest create_time of each chunk
sorted_chunk_ids = sorted(
chunk_earliest_times.keys(), key=lambda chunk_id: chunk_earliest_times[chunk_id]
)
return cached_results
# Rebuild cached_results in sorted order
sorted_cached_results = {}
for chunk_id in sorted_chunk_ids:
sorted_cached_results[chunk_id] = cached_results[chunk_id]
logger.info(
f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results"
)
return sorted_cached_results
async def _parse_extraction_result(