Add timestamp tracking for LLM responses and entity/relationship data

- Track timestamps for cache hits/misses
- Add timestamp to entity/relationship objects
- Sort descriptions by timestamp order
- Preserve temporal ordering in merges
This commit is contained in:
yangdx 2025-09-12 04:34:12 +08:00
parent 40688def20
commit 8660bf34e4
2 changed files with 87 additions and 45 deletions

View file

@ -300,7 +300,7 @@ async def _summarize_descriptions(
)
# Use LLM function with cache (higher priority for summary generation)
summary = await use_llm_func_with_cache(
summary, _ = await use_llm_func_with_cache(
use_prompt,
use_llm_func,
llm_response_cache=llm_response_cache,
@ -312,6 +312,7 @@ async def _summarize_descriptions(
async def _handle_single_entity_extraction(
record_attributes: list[str],
chunk_key: str,
timestamp: int,
file_path: str = "unknown_source",
):
if len(record_attributes) < 4 or "entity" not in record_attributes[0]:
@ -364,6 +365,7 @@ async def _handle_single_entity_extraction(
description=entity_description,
source_id=chunk_key,
file_path=file_path,
timestamp=timestamp,
)
except ValueError as e:
@ -381,6 +383,7 @@ async def _handle_single_entity_extraction(
async def _handle_single_relationship_extraction(
record_attributes: list[str],
chunk_key: str,
timestamp: int,
file_path: str = "unknown_source",
):
if len(record_attributes) < 5 or "relationship" not in record_attributes[0]:
@ -441,6 +444,7 @@ async def _handle_single_relationship_extraction(
keywords=edge_keywords,
source_id=edge_source_id,
file_path=file_path,
timestamp=timestamp,
)
except ValueError as e:
@ -503,7 +507,7 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status["history_messages"].append(status_message)
# Get cached extraction results for these chunks using storage
# cached_results chunk_id -> [list of extraction result from LLM cache sorted by created_at]
# cached_results chunk_id -> [list of (extraction_result, create_time) from LLM cache sorted by create_time of the first extraction_result]
cached_results = await _get_cached_extraction_results(
llm_response_cache,
all_referenced_chunk_ids,
@ -523,18 +527,19 @@ async def _rebuild_knowledge_from_chunks(
chunk_entities = {} # chunk_id -> {entity_name: [entity_data]}
chunk_relationships = {} # chunk_id -> {(src, tgt): [relationship_data]}
for chunk_id, extraction_results in cached_results.items():
for chunk_id, results in cached_results.items():
try:
# Handle multiple extraction results per chunk
chunk_entities[chunk_id] = defaultdict(list)
chunk_relationships[chunk_id] = defaultdict(list)
# process multiple LLM extraction results for a single chunk_id
for extraction_result in extraction_results:
for result in results:
entities, relationships = await _rebuild_from_extraction_result(
text_chunks_storage=text_chunks_storage,
extraction_result=extraction_result,
chunk_id=chunk_id,
extraction_result=result[0],
timestamp=result[1],
)
# Merge entities and relationships from this extraction result
@ -820,8 +825,6 @@ async def _get_cached_extraction_results(
cached_results[chunk_id].sort(key=lambda x: x[1])
# Store the earliest create_time for this chunk (first item after sorting)
chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1]
# Extract only extraction_result (x[0])
cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]]
# Sort cached_results by the earliest create_time of each chunk
sorted_chunk_ids = sorted(
@ -836,12 +839,13 @@ async def _get_cached_extraction_results(
logger.info(
f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results"
)
return sorted_cached_results
return sorted_cached_results # each item: list(extraction_result, create_time)
async def _process_extraction_result(
result: str,
chunk_key: str,
timestamp: int,
file_path: str = "unknown_source",
tuple_delimiter: str = "<|SEP|>",
completion_delimiter: str = "<|COMPLETE|>",
@ -886,7 +890,7 @@ async def _process_extraction_result(
# Try to parse as entity
entity_data = await _handle_single_entity_extraction(
record_attributes, chunk_key, file_path
record_attributes, chunk_key, timestamp, file_path
)
if entity_data is not None:
maybe_nodes[entity_data["entity_name"]].append(entity_data)
@ -894,7 +898,7 @@ async def _process_extraction_result(
# Try to parse as relationship
relationship_data = await _handle_single_relationship_extraction(
record_attributes, chunk_key, file_path
record_attributes, chunk_key, timestamp, file_path
)
if relationship_data is not None:
maybe_edges[
@ -905,7 +909,10 @@ async def _process_extraction_result(
async def _rebuild_from_extraction_result(
text_chunks_storage: BaseKVStorage, extraction_result: str, chunk_id: str
text_chunks_storage: BaseKVStorage,
extraction_result: str,
chunk_id: str,
timestamp: int,
) -> tuple[dict, dict]:
"""Parse cached extraction result using the same logic as extract_entities
@ -930,6 +937,7 @@ async def _rebuild_from_extraction_result(
return await _process_extraction_result(
extraction_result,
chunk_id,
timestamp,
file_path,
tuple_delimiter=PROMPTS["DEFAULT_TUPLE_DELIMITER"],
completion_delimiter=PROMPTS["DEFAULT_COMPLETION_DELIMITER"],
@ -1256,13 +1264,19 @@ async def _merge_nodes_then_upsert(
reverse=True,
)[0][0] # Get the entity type with the highest count
# merge and deduplicate description
description_list = list(
dict.fromkeys(
already_description
+ [dp["description"] for dp in nodes_data if dp.get("description")]
)
)
# Deduplicate by description, keeping first occurrence
unique_nodes = {}
for dp in nodes_data:
desc = dp["description"]
if desc not in unique_nodes:
unique_nodes[desc] = dp
# Sort description by timestamp
sorted_nodes = sorted(unique_nodes.values(), key=lambda x: x.get("timestamp", 0))
sorted_descriptions = [dp["description"] for dp in sorted_nodes]
# Combine already_description with sorted new sorted descriptions
description_list = already_description + sorted_descriptions
num_fragment = len(description_list)
already_fragment = len(already_description)
@ -1378,12 +1392,20 @@ async def _merge_edges_then_upsert(
# Process edges_data with None checks
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
description_list = list(
dict.fromkeys(
already_description
+ [dp["description"] for dp in edges_data if dp.get("description")]
)
)
# Deduplicate by description, keeping first occurrence
unique_edges = {}
for dp in edges_data:
if dp.get("description"):
desc = dp["description"]
if desc not in unique_edges:
unique_edges[desc] = dp
# Sort description by timestamp
sorted_edges = sorted(unique_edges.values(), key=lambda x: x.get("timestamp", 0))
sorted_descriptions = [dp["description"] for dp in sorted_edges]
# Combine already_description with sorted new descriptions
description_list = already_description + sorted_descriptions
num_fragment = len(description_list)
already_fragment = len(already_description)
@ -1966,7 +1988,7 @@ async def extract_entities(
"entity_continue_extraction_user_prompt"
].format(**{**context_base, "input_text": content})
final_result = await use_llm_func_with_cache(
final_result, timestamp = await use_llm_func_with_cache(
entity_extraction_user_prompt,
use_llm_func,
system_prompt=entity_extraction_system_prompt,
@ -1976,7 +1998,6 @@ async def extract_entities(
cache_keys_collector=cache_keys_collector,
)
# Store LLM cache reference in chunk (will be handled by use_llm_func_with_cache)
history = pack_user_ass_to_openai_messages(
entity_extraction_user_prompt, final_result
)
@ -1985,6 +2006,7 @@ async def extract_entities(
maybe_nodes, maybe_edges = await _process_extraction_result(
final_result,
chunk_key,
timestamp,
file_path,
tuple_delimiter=context_base["tuple_delimiter"],
completion_delimiter=context_base["completion_delimiter"],
@ -1992,7 +2014,7 @@ async def extract_entities(
# Process additional gleaning results only 1 time when entity_extract_max_gleaning is greater than zero.
if entity_extract_max_gleaning > 0:
glean_result = await use_llm_func_with_cache(
glean_result, timestamp = await use_llm_func_with_cache(
entity_continue_extraction_user_prompt,
use_llm_func,
system_prompt=entity_extraction_system_prompt,
@ -2007,6 +2029,7 @@ async def extract_entities(
glean_nodes, glean_edges = await _process_extraction_result(
glean_result,
chunk_key,
timestamp,
file_path,
tuple_delimiter=context_base["tuple_delimiter"],
completion_delimiter=context_base["completion_delimiter"],
@ -2157,10 +2180,11 @@ async def kg_query(
query_param.user_prompt or "",
query_param.enable_rerank,
)
cached_response = await handle_cache(
cached_result = await handle_cache(
hashing_kv, args_hash, query, query_param.mode, cache_type="query"
)
if cached_response is not None:
if cached_result is not None:
cached_response, _ = cached_result # Extract content, ignore timestamp
if not query_param.only_need_context and not query_param.only_need_prompt:
return cached_response
@ -2325,10 +2349,11 @@ async def extract_keywords_only(
param.hl_keywords or [],
param.ll_keywords or [],
)
cached_response = await handle_cache(
cached_result = await handle_cache(
hashing_kv, args_hash, text, param.mode, cache_type="keywords"
)
if cached_response is not None:
if cached_result is not None:
cached_response, _ = cached_result # Extract content, ignore timestamp
try:
keywords_data = json_repair.loads(cached_response)
return keywords_data.get("high_level_keywords", []), keywords_data.get(
@ -2958,10 +2983,11 @@ async def kg_search(
query_param.user_prompt or "",
query_param.enable_rerank,
)
cached_response = await handle_cache(
cached_result = await handle_cache(
hashing_kv, args_hash, query, query_param.mode, cache_type="search"
)
if cached_response is not None:
if cached_result is not None:
cached_response, _ = cached_result # Extract content, ignore timestamp
try:
return json_repair.loads(cached_response)
except (json.JSONDecodeError, KeyError):
@ -3982,10 +4008,11 @@ async def naive_query(
query_param.user_prompt or "",
query_param.enable_rerank,
)
cached_response = await handle_cache(
cached_result = await handle_cache(
hashing_kv, args_hash, query, query_param.mode, cache_type="query"
)
if cached_response is not None:
if cached_result is not None:
cached_response, _ = cached_result # Extract content, ignore timestamp
if not query_param.only_need_context and not query_param.only_need_prompt:
return cached_response

View file

@ -9,6 +9,7 @@ import logging
import logging.handlers
import os
import re
import time
import uuid
from dataclasses import dataclass
from datetime import datetime
@ -1035,8 +1036,12 @@ async def handle_cache(
prompt,
mode="default",
cache_type="unknown",
) -> str | None:
"""Generic cache handling function with flattened cache keys"""
) -> tuple[str, int] | None:
"""Generic cache handling function with flattened cache keys
Returns:
tuple[str, int] | None: (content, create_time) if cache hit, None if cache miss
"""
if hashing_kv is None:
return None
@ -1052,7 +1057,9 @@ async def handle_cache(
cache_entry = await hashing_kv.get_by_id(flattened_key)
if cache_entry:
logger.debug(f"Flattened cache hit(key:{flattened_key})")
return cache_entry["return"]
content = cache_entry["return"]
timestamp = cache_entry.get("create_time", 0)
return content, timestamp
logger.debug(f"Cache missed(mode:{mode} type:{cache_type})")
return None
@ -1593,7 +1600,7 @@ async def use_llm_func_with_cache(
cache_type: str = "extract",
chunk_id: str | None = None,
cache_keys_collector: list = None,
) -> str:
) -> tuple[str, int]:
"""Call LLM function with cache support and text sanitization
If cache is available and enabled (determined by handle_cache based on mode),
@ -1613,7 +1620,9 @@ async def use_llm_func_with_cache(
cache_keys_collector: Optional list to collect cache keys for batch processing
Returns:
LLM response text
tuple[str, int]: (LLM response text, timestamp)
- For cache hits: (content, cache_create_time)
- For cache misses: (content, current_timestamp)
"""
# Sanitize input text to prevent UTF-8 encoding errors for all LLM providers
safe_user_prompt = sanitize_text_for_encoding(user_prompt)
@ -1648,14 +1657,15 @@ async def use_llm_func_with_cache(
# Generate cache key for this LLM call
cache_key = generate_cache_key("default", cache_type, arg_hash)
cached_return = await handle_cache(
cached_result = await handle_cache(
llm_response_cache,
arg_hash,
_prompt,
"default",
cache_type=cache_type,
)
if cached_return:
if cached_result:
content, timestamp = cached_result
logger.debug(f"Found cache for {arg_hash}")
statistic_data["llm_cache"] += 1
@ -1663,7 +1673,7 @@ async def use_llm_func_with_cache(
if cache_keys_collector is not None:
cache_keys_collector.append(cache_key)
return cached_return
return content, timestamp
statistic_data["llm_call"] += 1
# Call LLM with sanitized input
@ -1679,6 +1689,9 @@ async def use_llm_func_with_cache(
res = remove_think_tags(res)
# Generate timestamp for cache miss (LLM call completion time)
current_timestamp = int(time.time())
if llm_response_cache.global_config.get("enable_llm_cache_for_entity_extract"):
await save_to_cache(
llm_response_cache,
@ -1695,7 +1708,7 @@ async def use_llm_func_with_cache(
if cache_keys_collector is not None:
cache_keys_collector.append(cache_key)
return res
return res, current_timestamp
# When cache is disabled, directly call LLM with sanitized input
kwargs = {}
@ -1714,7 +1727,9 @@ async def use_llm_func_with_cache(
# Re-raise with the same exception type but modified message
raise type(e)(error_msg) from e
return remove_think_tags(res)
# Generate timestamp for non-cached LLM call
current_timestamp = int(time.time())
return remove_think_tags(res), current_timestamp
def get_content_summary(content: str, max_length: int = 250) -> str: