diff --git a/lightrag/operate.py b/lightrag/operate.py index 69cd2205..f142aedf 100644 --- a/lightrag/operate.py +++ b/lightrag/operate.py @@ -300,7 +300,7 @@ async def _summarize_descriptions( ) # Use LLM function with cache (higher priority for summary generation) - summary = await use_llm_func_with_cache( + summary, _ = await use_llm_func_with_cache( use_prompt, use_llm_func, llm_response_cache=llm_response_cache, @@ -312,6 +312,7 @@ async def _summarize_descriptions( async def _handle_single_entity_extraction( record_attributes: list[str], chunk_key: str, + timestamp: int, file_path: str = "unknown_source", ): if len(record_attributes) < 4 or "entity" not in record_attributes[0]: @@ -364,6 +365,7 @@ async def _handle_single_entity_extraction( description=entity_description, source_id=chunk_key, file_path=file_path, + timestamp=timestamp, ) except ValueError as e: @@ -381,6 +383,7 @@ async def _handle_single_entity_extraction( async def _handle_single_relationship_extraction( record_attributes: list[str], chunk_key: str, + timestamp: int, file_path: str = "unknown_source", ): if len(record_attributes) < 5 or "relationship" not in record_attributes[0]: @@ -441,6 +444,7 @@ async def _handle_single_relationship_extraction( keywords=edge_keywords, source_id=edge_source_id, file_path=file_path, + timestamp=timestamp, ) except ValueError as e: @@ -503,7 +507,7 @@ async def _rebuild_knowledge_from_chunks( pipeline_status["history_messages"].append(status_message) # Get cached extraction results for these chunks using storage - # cached_results: chunk_id -> [list of extraction result from LLM cache sorted by created_at] + # cached_results: chunk_id -> [list of (extraction_result, create_time) from LLM cache sorted by create_time of the first extraction_result] cached_results = await _get_cached_extraction_results( llm_response_cache, all_referenced_chunk_ids, @@ -523,18 +527,19 @@ async def _rebuild_knowledge_from_chunks( chunk_entities = {} # chunk_id -> {entity_name: [entity_data]} chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]} - for chunk_id, extraction_results in cached_results.items(): + for chunk_id, results in cached_results.items(): try: # Handle multiple extraction results per chunk chunk_entities[chunk_id] = defaultdict(list) chunk_relationships[chunk_id] = defaultdict(list) # process multiple LLM extraction results for a single chunk_id - for extraction_result in extraction_results: + for result in results: entities, relationships = await _rebuild_from_extraction_result( text_chunks_storage=text_chunks_storage, - extraction_result=extraction_result, chunk_id=chunk_id, + extraction_result=result[0], + timestamp=result[1], ) # Merge entities and relationships from this extraction result @@ -820,8 +825,6 @@ async def _get_cached_extraction_results( cached_results[chunk_id].sort(key=lambda x: x[1]) # Store the earliest create_time for this chunk (first item after sorting) chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1] - # Extract only extraction_result (x[0]) - cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]] # Sort cached_results by the earliest create_time of each chunk sorted_chunk_ids = sorted( @@ -836,12 +839,13 @@ async def _get_cached_extraction_results( logger.info( f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results" ) - return sorted_cached_results + return sorted_cached_results # each item: list(extraction_result, create_time) async def _process_extraction_result( result: str, chunk_key: str, + timestamp: int, file_path: str = "unknown_source", tuple_delimiter: str = "<|SEP|>", completion_delimiter: str = "<|COMPLETE|>", @@ -886,7 +890,7 @@ async def _process_extraction_result( # Try to parse as entity entity_data = await _handle_single_entity_extraction( - record_attributes, chunk_key, file_path + record_attributes, chunk_key, timestamp, file_path ) if entity_data is not None: maybe_nodes[entity_data["entity_name"]].append(entity_data) @@ -894,7 +898,7 @@ async def _process_extraction_result( # Try to parse as relationship relationship_data = await _handle_single_relationship_extraction( - record_attributes, chunk_key, file_path + record_attributes, chunk_key, timestamp, file_path ) if relationship_data is not None: maybe_edges[ @@ -905,7 +909,10 @@ async def _process_extraction_result( async def _rebuild_from_extraction_result( - text_chunks_storage: BaseKVStorage, extraction_result: str, chunk_id: str + text_chunks_storage: BaseKVStorage, + extraction_result: str, + chunk_id: str, + timestamp: int, ) -> tuple[dict, dict]: """Parse cached extraction result using the same logic as extract_entities @@ -930,6 +937,7 @@ async def _rebuild_from_extraction_result( return await _process_extraction_result( extraction_result, chunk_id, + timestamp, file_path, tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"], completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"], @@ -1256,13 +1264,19 @@ async def _merge_nodes_then_upsert( reverse=True, )[0][0] # Get the entity type with the highest count - # merge and deduplicate description - description_list = list( - dict.fromkeys( - already_description - + [dp["description"] for dp in nodes_data if dp.get("description")] - ) - ) + # Deduplicate by description, keeping first occurrence + unique_nodes = {} + for dp in nodes_data: + desc = dp["description"] + if desc not in unique_nodes: + unique_nodes[desc] = dp + + # Sort description by timestamp + sorted_nodes = sorted(unique_nodes.values(), key=lambda x: x.get("timestamp", 0)) + sorted_descriptions = [dp["description"] for dp in sorted_nodes] + + # Combine already_description with sorted new sorted descriptions + description_list = already_description + sorted_descriptions num_fragment = len(description_list) already_fragment = len(already_description) @@ -1378,12 +1392,20 @@ async def _merge_edges_then_upsert( # Process edges_data with None checks weight = sum([dp["weight"] for dp in edges_data] + already_weights) - description_list = list( - dict.fromkeys( - already_description - + [dp["description"] for dp in edges_data if dp.get("description")] - ) - ) + # Deduplicate by description, keeping first occurrence + unique_edges = {} + for dp in edges_data: + if dp.get("description"): + desc = dp["description"] + if desc not in unique_edges: + unique_edges[desc] = dp + + # Sort description by timestamp + sorted_edges = sorted(unique_edges.values(), key=lambda x: x.get("timestamp", 0)) + sorted_descriptions = [dp["description"] for dp in sorted_edges] + + # Combine already_description with sorted new descriptions + description_list = already_description + sorted_descriptions num_fragment = len(description_list) already_fragment = len(already_description) @@ -1966,7 +1988,7 @@ async def extract_entities( "entity_continue_extraction_user_prompt" ].format(**{**context_base, "input_text": content}) - final_result = await use_llm_func_with_cache( + final_result, timestamp = await use_llm_func_with_cache( entity_extraction_user_prompt, use_llm_func, system_prompt=entity_extraction_system_prompt, @@ -1976,7 +1998,6 @@ async def extract_entities( cache_keys_collector=cache_keys_collector, ) - # Store LLM cache reference in chunk (will be handled by use_llm_func_with_cache) history = pack_user_ass_to_openai_messages( entity_extraction_user_prompt, final_result ) @@ -1985,6 +2006,7 @@ async def extract_entities( maybe_nodes, maybe_edges = await _process_extraction_result( final_result, chunk_key, + timestamp, file_path, tuple_delimiter=context_base["tuple_delimiter"], completion_delimiter=context_base["completion_delimiter"], @@ -1992,7 +2014,7 @@ async def extract_entities( # Process additional gleaning results only 1 time when entity_extract_max_gleaning is greater than zero. if entity_extract_max_gleaning > 0: - glean_result = await use_llm_func_with_cache( + glean_result, timestamp = await use_llm_func_with_cache( entity_continue_extraction_user_prompt, use_llm_func, system_prompt=entity_extraction_system_prompt, @@ -2007,6 +2029,7 @@ async def extract_entities( glean_nodes, glean_edges = await _process_extraction_result( glean_result, chunk_key, + timestamp, file_path, tuple_delimiter=context_base["tuple_delimiter"], completion_delimiter=context_base["completion_delimiter"], @@ -2157,10 +2180,11 @@ async def kg_query( query_param.user_prompt or "", query_param.enable_rerank, ) - cached_response = await handle_cache( + cached_result = await handle_cache( hashing_kv, args_hash, query, query_param.mode, cache_type="query" ) - if cached_response is not None: + if cached_result is not None: + cached_response, _ = cached_result # Extract content, ignore timestamp if not query_param.only_need_context and not query_param.only_need_prompt: return cached_response @@ -2325,10 +2349,11 @@ async def extract_keywords_only( param.hl_keywords or [], param.ll_keywords or [], ) - cached_response = await handle_cache( + cached_result = await handle_cache( hashing_kv, args_hash, text, param.mode, cache_type="keywords" ) - if cached_response is not None: + if cached_result is not None: + cached_response, _ = cached_result # Extract content, ignore timestamp try: keywords_data = json_repair.loads(cached_response) return keywords_data.get("high_level_keywords", []), keywords_data.get( @@ -2958,10 +2983,11 @@ async def kg_search( query_param.user_prompt or "", query_param.enable_rerank, ) - cached_response = await handle_cache( + cached_result = await handle_cache( hashing_kv, args_hash, query, query_param.mode, cache_type="search" ) - if cached_response is not None: + if cached_result is not None: + cached_response, _ = cached_result # Extract content, ignore timestamp try: return json_repair.loads(cached_response) except (json.JSONDecodeError, KeyError): @@ -3982,10 +4008,11 @@ async def naive_query( query_param.user_prompt or "", query_param.enable_rerank, ) - cached_response = await handle_cache( + cached_result = await handle_cache( hashing_kv, args_hash, query, query_param.mode, cache_type="query" ) - if cached_response is not None: + if cached_result is not None: + cached_response, _ = cached_result # Extract content, ignore timestamp if not query_param.only_need_context and not query_param.only_need_prompt: return cached_response diff --git a/lightrag/utils.py b/lightrag/utils.py index cf39e64e..918fe015 100644 --- a/lightrag/utils.py +++ b/lightrag/utils.py @@ -9,6 +9,7 @@ import logging import logging.handlers import os import re +import time import uuid from dataclasses import dataclass from datetime import datetime @@ -1035,8 +1036,12 @@ async def handle_cache( prompt, mode="default", cache_type="unknown", -) -> str | None: - """Generic cache handling function with flattened cache keys""" +) -> tuple[str, int] | None: + """Generic cache handling function with flattened cache keys + + Returns: + tuple[str, int] | None: (content, create_time) if cache hit, None if cache miss + """ if hashing_kv is None: return None @@ -1052,7 +1057,9 @@ async def handle_cache( cache_entry = await hashing_kv.get_by_id(flattened_key) if cache_entry: logger.debug(f"Flattened cache hit(key:{flattened_key})") - return cache_entry["return"] + content = cache_entry["return"] + timestamp = cache_entry.get("create_time", 0) + return content, timestamp logger.debug(f"Cache missed(mode:{mode} type:{cache_type})") return None @@ -1593,7 +1600,7 @@ async def use_llm_func_with_cache( cache_type: str = "extract", chunk_id: str | None = None, cache_keys_collector: list = None, -) -> str: +) -> tuple[str, int]: """Call LLM function with cache support and text sanitization If cache is available and enabled (determined by handle_cache based on mode), @@ -1613,7 +1620,9 @@ async def use_llm_func_with_cache( cache_keys_collector: Optional list to collect cache keys for batch processing Returns: - LLM response text + tuple[str, int]: (LLM response text, timestamp) + - For cache hits: (content, cache_create_time) + - For cache misses: (content, current_timestamp) """ # Sanitize input text to prevent UTF-8 encoding errors for all LLM providers safe_user_prompt = sanitize_text_for_encoding(user_prompt) @@ -1648,14 +1657,15 @@ async def use_llm_func_with_cache( # Generate cache key for this LLM call cache_key = generate_cache_key("default", cache_type, arg_hash) - cached_return = await handle_cache( + cached_result = await handle_cache( llm_response_cache, arg_hash, _prompt, "default", cache_type=cache_type, ) - if cached_return: + if cached_result: + content, timestamp = cached_result logger.debug(f"Found cache for {arg_hash}") statistic_data["llm_cache"] += 1 @@ -1663,7 +1673,7 @@ async def use_llm_func_with_cache( if cache_keys_collector is not None: cache_keys_collector.append(cache_key) - return cached_return + return content, timestamp statistic_data["llm_call"] += 1 # Call LLM with sanitized input @@ -1679,6 +1689,9 @@ async def use_llm_func_with_cache( res = remove_think_tags(res) + # Generate timestamp for cache miss (LLM call completion time) + current_timestamp = int(time.time()) + if llm_response_cache.global_config.get("enable_llm_cache_for_entity_extract"): await save_to_cache( llm_response_cache, @@ -1695,7 +1708,7 @@ async def use_llm_func_with_cache( if cache_keys_collector is not None: cache_keys_collector.append(cache_key) - return res + return res, current_timestamp # When cache is disabled, directly call LLM with sanitized input kwargs = {} @@ -1714,7 +1727,9 @@ async def use_llm_func_with_cache( # Re-raise with the same exception type but modified message raise type(e)(error_msg) from e - return remove_think_tags(res) + # Generate timestamp for non-cached LLM call + current_timestamp = int(time.time()) + return remove_think_tags(res), current_timestamp def get_content_summary(content: str, max_length: int = 250) -> str: