diff --git a/lightrag/operate.py b/lightrag/operate.py index 7ebb09df..779938af 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -630,14 +630,20 @@ async def _get_cached_extraction_results( ) -> dict[str, list[str]]: """Get cached extraction results for specific chunk IDs + This function retrieves cached LLM extraction results for the given chunk IDs and returns + them sorted by creation time. The results are sorted at two levels: + 1. Individual extraction results within each chunk are sorted by create_time (earliest first) + 2. Chunks themselves are sorted by the create_time of their earliest extraction result + Args: llm_response_cache: LLM response cache storage chunk_ids: Set of chunk IDs to get cached results for - text_chunks_data: Pre-loaded chunk data (optional, for performance) - text_chunks_storage: Text chunks storage (fallback if text_chunks_data is None) + text_chunks_storage: Text chunks storage for retrieving chunk data and LLM cache references Returns: - Dict mapping chunk_id -> list of extraction_result_text + Dict mapping chunk_id -> list of extraction_result_text, where: + - Keys (chunk_ids) are ordered by the create_time of their first extraction result + - Values (extraction results) are ordered by create_time within each chunk """ cached_results = {} @@ -646,15 +652,13 @@ async def _get_cached_extraction_results( # Read from storage chunk_data_list = await text_chunks_storage.get_by_ids(list(chunk_ids)) - for chunk_id, chunk_data in zip(chunk_ids, chunk_data_list): + for chunk_data in chunk_data_list: if chunk_data and isinstance(chunk_data, dict): llm_cache_list = chunk_data.get("llm_cache_list", []) if llm_cache_list: all_cache_ids.update(llm_cache_list) else: - logger.warning( - f"Chunk {chunk_id} data is invalid or None: {type(chunk_data)}" - ) + logger.warning(f"Chunk data is invalid or None: {chunk_data}") if not all_cache_ids: logger.warning(f"No LLM cache IDs found for {len(chunk_ids)} chunk IDs") @@ -665,7 +669,7 @@ async def _get_cached_extraction_results( # Process cache entries and group by chunk_id valid_entries = 0 - for cache_id, cache_entry in zip(all_cache_ids, cache_data_list): + for cache_entry in cache_data_list: if ( cache_entry is not None and isinstance(cache_entry, dict) @@ -685,16 +689,30 @@ async def _get_cached_extraction_results( # Store tuple with extraction result and creation time for sorting cached_results[chunk_id].append((extraction_result, create_time)) - # Sort extraction results by create_time for each chunk + # Sort extraction results by create_time for each chunk and collect earliest times + chunk_earliest_times = {} for chunk_id in cached_results: # Sort by create_time (x[1]), then extract only extraction_result (x[0]) cached_results[chunk_id].sort(key=lambda x: x[1]) + # Store the earliest create_time for this chunk (first item after sorting) + chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1] + # Extract only extraction_result (x[0]) cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]] - logger.info( - f"Found {valid_entries} valid cache entries, {len(cached_results)} chunks with results" + # Sort cached_results by the earliest create_time of each chunk + sorted_chunk_ids = sorted( + chunk_earliest_times.keys(), key=lambda chunk_id: chunk_earliest_times[chunk_id] ) - return cached_results + + # Rebuild cached_results in sorted order + sorted_cached_results = {} + for chunk_id in sorted_chunk_ids: + sorted_cached_results[chunk_id] = cached_results[chunk_id] + + logger.info( + f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results" + ) + return sorted_cached_results async def _parse_extraction_result(