Merge pull request #2006 from danielaskdd/optimize-merge-stage
refac: Refactor LLM Summary Generation Algorithm
This commit is contained in:
commit
82f72521f5
11 changed files with 581 additions and 357 deletions
|
|
@ -268,7 +268,8 @@ if __name__ == "__main__":
|
|||
| **embedding_func_max_async** | `int` | 最大并发异步嵌入进程数 | `16` |
|
||||
| **llm_model_func** | `callable` | LLM生成的函数 | `gpt_4o_mini_complete` |
|
||||
| **llm_model_name** | `str` | 用于生成的LLM模型名称 | `meta-llama/Llama-3.2-1B-Instruct` |
|
||||
| **summary_max_tokens** | `int` | 生成实体关系摘要时送给LLM的最大令牌数 | `30000`(由环境变量 SUMMARY_MAX_TOKENS 设置) |
|
||||
| **summary_context_size** | `int` | 合并实体关系摘要时送给LLM的最大令牌数 | `10000`(由环境变量 SUMMARY_MAX_CONTEXT 设置) |
|
||||
| **summary_max_tokens** | `int` | 合并实体关系描述的最大令牌数长度 | `500`(由环境变量 SUMMARY_MAX_TOKENS 设置) |
|
||||
| **llm_model_max_async** | `int` | 最大并发异步LLM进程数 | `4`(默认值由环境变量MAX_ASYNC更改) |
|
||||
| **llm_model_kwargs** | `dict` | LLM生成的附加参数 | |
|
||||
| **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2(默认值由环境变量COSINE_THRESHOLD更改) |
|
||||
|
|
@ -598,9 +599,9 @@ if __name__ == "__main__":
|
|||
|
||||
为了提高检索质量,可以根据更有效的相关性评分模型对文档进行重排序。`rerank.py`文件提供了三个Reranker提供商的驱动函数:
|
||||
|
||||
* **Cohere / vLLM**: `cohere_rerank`
|
||||
* **Jina AI**: `jina_rerank`
|
||||
* **Aliyun阿里云**: `ali_rerank`
|
||||
* **Cohere / vLLM**: `cohere_rerank`
|
||||
* **Jina AI**: `jina_rerank`
|
||||
* **Aliyun阿里云**: `ali_rerank`
|
||||
|
||||
您可以将这些函数之一注入到LightRAG对象的`rerank_model_func`属性中。这将使LightRAG的查询功能能够使用注入的函数对检索到的文本块进行重新排序。有关详细用法,请参阅`examples/rerank_example.py`文件。
|
||||
|
||||
|
|
|
|||
|
|
@ -275,7 +275,8 @@ A full list of LightRAG init parameters:
|
|||
| **embedding_func_max_async** | `int` | Maximum number of concurrent asynchronous embedding processes | `16` |
|
||||
| **llm_model_func** | `callable` | Function for LLM generation | `gpt_4o_mini_complete` |
|
||||
| **llm_model_name** | `str` | LLM model name for generation | `meta-llama/Llama-3.2-1B-Instruct` |
|
||||
| **summary_max_tokens** | `int` | Maximum tokens send to LLM to generate entity relation summaries | `30000`(configured by env var SUMMARY_MAX_TOKENS) |
|
||||
| **summary_context_size** | `int` | Maximum tokens send to LLM to generate summaries for entity relation merging | `10000`(configured by env var SUMMARY_CONTEXT_SIZE) |
|
||||
| **summary_max_tokens** | `int` | Maximum token size for entity/relation description | `500`(configured by env var SUMMARY_MAX_TOKENS) |
|
||||
| **llm_model_max_async** | `int` | Maximum number of concurrent asynchronous LLM processes | `4`(default value changed by env var MAX_ASYNC) |
|
||||
| **llm_model_kwargs** | `dict` | Additional parameters for LLM generation | |
|
||||
| **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2(default value changed by env var COSINE_THRESHOLD) |
|
||||
|
|
|
|||
15
env.example
15
env.example
|
|
@ -125,12 +125,15 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true
|
|||
### Chunk size for document splitting, 500~1500 is recommended
|
||||
# CHUNK_SIZE=1200
|
||||
# CHUNK_OVERLAP_SIZE=100
|
||||
### Entity and relation summarization configuration
|
||||
### Number of duplicated entities/edges to trigger LLM re-summary on merge (at least 3 is recommented), and max tokens send to LLM
|
||||
# FORCE_LLM_SUMMARY_ON_MERGE=4
|
||||
# SUMMARY_MAX_TOKENS=30000
|
||||
### Maximum number of entity extraction attempts for ambiguous content
|
||||
# MAX_GLEANING=1
|
||||
|
||||
### Number of summary semgments or tokens to trigger LLM summary on entity/relation merge (at least 3 is recommented)
|
||||
# FORCE_LLM_SUMMARY_ON_MERGE=8
|
||||
### Max description token size to trigger LLM summary
|
||||
# SUMMARY_MAX_TOKENS = 1200
|
||||
### Recommended LLM summary output length in tokens
|
||||
# SUMMARY_LENGTH_RECOMMENDED_=600
|
||||
### Maximum context size sent to LLM for description summary
|
||||
# SUMMARY_CONTEXT_SIZE=12000
|
||||
|
||||
###############################
|
||||
### Concurrency Configuration
|
||||
|
|
|
|||
|
|
@ -30,6 +30,8 @@ from lightrag.constants import (
|
|||
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
|
||||
DEFAULT_MAX_ASYNC,
|
||||
DEFAULT_SUMMARY_MAX_TOKENS,
|
||||
DEFAULT_SUMMARY_LENGTH_RECOMMENDED,
|
||||
DEFAULT_SUMMARY_CONTEXT_SIZE,
|
||||
DEFAULT_SUMMARY_LANGUAGE,
|
||||
DEFAULT_EMBEDDING_FUNC_MAX_ASYNC,
|
||||
DEFAULT_EMBEDDING_BATCH_NUM,
|
||||
|
|
@ -119,10 +121,26 @@ def parse_args() -> argparse.Namespace:
|
|||
help=f"Maximum async operations (default: from env or {DEFAULT_MAX_ASYNC})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--max-tokens",
|
||||
"--summary-max-tokens",
|
||||
type=int,
|
||||
default=get_env_value("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS, int),
|
||||
help=f"Maximum token size (default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})",
|
||||
help=f"Maximum token size for entity/relation summary(default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--summary-context-size",
|
||||
type=int,
|
||||
default=get_env_value(
|
||||
"SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE, int
|
||||
),
|
||||
help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_CONTEXT_SIZE})",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--summary-length-recommended",
|
||||
type=int,
|
||||
default=get_env_value(
|
||||
"SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED, int
|
||||
),
|
||||
help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_LENGTH_RECOMMENDED})",
|
||||
)
|
||||
|
||||
# Logging configuration
|
||||
|
|
|
|||
|
|
@ -2,7 +2,7 @@
|
|||
LightRAG FastAPI Server
|
||||
"""
|
||||
|
||||
from fastapi import FastAPI, Depends, HTTPException, status
|
||||
from fastapi import FastAPI, Depends, HTTPException
|
||||
import asyncio
|
||||
import os
|
||||
import logging
|
||||
|
|
@ -472,7 +472,8 @@ def create_app(args):
|
|||
),
|
||||
llm_model_name=args.llm_model,
|
||||
llm_model_max_async=args.max_async,
|
||||
summary_max_tokens=args.max_tokens,
|
||||
summary_max_tokens=args.summary_max_tokens,
|
||||
summary_context_size=args.summary_context_size,
|
||||
chunk_token_size=int(args.chunk_size),
|
||||
chunk_overlap_token_size=int(args.chunk_overlap_size),
|
||||
llm_model_kwargs=(
|
||||
|
|
@ -510,7 +511,8 @@ def create_app(args):
|
|||
chunk_overlap_token_size=int(args.chunk_overlap_size),
|
||||
llm_model_name=args.llm_model,
|
||||
llm_model_max_async=args.max_async,
|
||||
summary_max_tokens=args.max_tokens,
|
||||
summary_max_tokens=args.summary_max_tokens,
|
||||
summary_context_size=args.summary_context_size,
|
||||
embedding_func=embedding_func,
|
||||
kv_storage=args.kv_storage,
|
||||
graph_storage=args.graph_storage,
|
||||
|
|
@ -597,9 +599,7 @@ def create_app(args):
|
|||
}
|
||||
username = form_data.username
|
||||
if auth_handler.accounts.get(username) != form_data.password:
|
||||
raise HTTPException(
|
||||
status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect credentials"
|
||||
)
|
||||
raise HTTPException(status_code=401, detail="Incorrect credentials")
|
||||
|
||||
# Regular user login
|
||||
user_token = auth_handler.create_token(
|
||||
|
|
@ -642,7 +642,8 @@ def create_app(args):
|
|||
"embedding_binding": args.embedding_binding,
|
||||
"embedding_binding_host": args.embedding_binding_host,
|
||||
"embedding_model": args.embedding_model,
|
||||
"max_tokens": args.max_tokens,
|
||||
"summary_max_tokens": args.summary_max_tokens,
|
||||
"summary_context_size": args.summary_context_size,
|
||||
"kv_storage": args.kv_storage,
|
||||
"doc_status_storage": args.doc_status_storage,
|
||||
"graph_storage": args.graph_storage,
|
||||
|
|
|
|||
|
|
@ -242,8 +242,8 @@ def display_splash_screen(args: argparse.Namespace) -> None:
|
|||
ASCIIColors.yellow(f"{args.llm_model}")
|
||||
ASCIIColors.white(" ├─ Max Async for LLM: ", end="")
|
||||
ASCIIColors.yellow(f"{args.max_async}")
|
||||
ASCIIColors.white(" ├─ Max Tokens: ", end="")
|
||||
ASCIIColors.yellow(f"{args.max_tokens}")
|
||||
ASCIIColors.white(" ├─ Summary Context Size: ", end="")
|
||||
ASCIIColors.yellow(f"{args.summary_context_size}")
|
||||
ASCIIColors.white(" ├─ LLM Cache Enabled: ", end="")
|
||||
ASCIIColors.yellow(f"{args.enable_llm_cache}")
|
||||
ASCIIColors.white(" └─ LLM Cache for Extraction Enabled: ", end="")
|
||||
|
|
|
|||
|
|
@ -12,9 +12,16 @@ DEFAULT_MAX_GRAPH_NODES = 1000
|
|||
|
||||
# Default values for extraction settings
|
||||
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for summaries
|
||||
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 4
|
||||
DEFAULT_MAX_GLEANING = 1
|
||||
DEFAULT_SUMMARY_MAX_TOKENS = 30000 # Default maximum token size
|
||||
|
||||
# Number of description fragments to trigger LLM summary
|
||||
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8
|
||||
# Max description token size to trigger LLM summary
|
||||
DEFAULT_SUMMARY_MAX_TOKENS = 1200
|
||||
# Recommended LLM summary output length in tokens
|
||||
DEFAULT_SUMMARY_LENGTH_RECOMMENDED = 600
|
||||
# Maximum token size sent to LLM for summary
|
||||
DEFAULT_SUMMARY_CONTEXT_SIZE = 12000
|
||||
|
||||
# Separator for graph fields
|
||||
GRAPH_FIELD_SEP = "<SEP>"
|
||||
|
|
|
|||
|
|
@ -34,6 +34,8 @@ from lightrag.constants import (
|
|||
DEFAULT_KG_CHUNK_PICK_METHOD,
|
||||
DEFAULT_MIN_RERANK_SCORE,
|
||||
DEFAULT_SUMMARY_MAX_TOKENS,
|
||||
DEFAULT_SUMMARY_CONTEXT_SIZE,
|
||||
DEFAULT_SUMMARY_LENGTH_RECOMMENDED,
|
||||
DEFAULT_MAX_ASYNC,
|
||||
DEFAULT_MAX_PARALLEL_INSERT,
|
||||
DEFAULT_MAX_GRAPH_NODES,
|
||||
|
|
@ -285,8 +287,20 @@ class LightRAG:
|
|||
summary_max_tokens: int = field(
|
||||
default=int(os.getenv("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS))
|
||||
)
|
||||
"""Maximum tokens allowed for entity/relation description."""
|
||||
|
||||
summary_context_size: int = field(
|
||||
default=int(os.getenv("SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE))
|
||||
)
|
||||
"""Maximum number of tokens allowed per LLM response."""
|
||||
|
||||
summary_length_recommended: int = field(
|
||||
default=int(
|
||||
os.getenv("SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED)
|
||||
)
|
||||
)
|
||||
"""Recommended length of LLM summary output."""
|
||||
|
||||
llm_model_max_async: int = field(
|
||||
default=int(os.getenv("MAX_ASYNC", DEFAULT_MAX_ASYNC))
|
||||
)
|
||||
|
|
@ -416,6 +430,20 @@ class LightRAG:
|
|||
if self.ollama_server_infos is None:
|
||||
self.ollama_server_infos = OllamaServerInfos()
|
||||
|
||||
# Validate config
|
||||
if self.force_llm_summary_on_merge < 3:
|
||||
logger.warning(
|
||||
f"force_llm_summary_on_merge should be at least 3, got {self.force_llm_summary_on_merge}"
|
||||
)
|
||||
if self.summary_context_size > self.max_total_tokens:
|
||||
logger.warning(
|
||||
f"summary_context_size({self.summary_context_size}) should no greater than max_total_tokens({self.max_total_tokens})"
|
||||
)
|
||||
if self.summary_length_recommended > self.summary_max_tokens:
|
||||
logger.warning(
|
||||
f"max_total_tokens({self.summary_max_tokens}) should greater than summary_length_recommended({self.summary_length_recommended})"
|
||||
)
|
||||
|
||||
# Fix global_config now
|
||||
global_config = asdict(self)
|
||||
|
||||
|
|
@ -2272,117 +2300,111 @@ class LightRAG:
|
|||
relationships_to_delete = set()
|
||||
relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids
|
||||
|
||||
# Use graph database lock to ensure atomic merges and updates
|
||||
try:
|
||||
# Get affected entities and relations from full_entities and full_relations storage
|
||||
doc_entities_data = await self.full_entities.get_by_id(doc_id)
|
||||
doc_relations_data = await self.full_relations.get_by_id(doc_id)
|
||||
|
||||
affected_nodes = []
|
||||
affected_edges = []
|
||||
|
||||
# Get entity data from graph storage using entity names from full_entities
|
||||
if doc_entities_data and "entity_names" in doc_entities_data:
|
||||
entity_names = doc_entities_data["entity_names"]
|
||||
# get_nodes_batch returns dict[str, dict], need to convert to list[dict]
|
||||
nodes_dict = await self.chunk_entity_relation_graph.get_nodes_batch(
|
||||
entity_names
|
||||
)
|
||||
for entity_name in entity_names:
|
||||
node_data = nodes_dict.get(entity_name)
|
||||
if node_data:
|
||||
# Ensure compatibility with existing logic that expects "id" field
|
||||
if "id" not in node_data:
|
||||
node_data["id"] = entity_name
|
||||
affected_nodes.append(node_data)
|
||||
|
||||
# Get relation data from graph storage using relation pairs from full_relations
|
||||
if doc_relations_data and "relation_pairs" in doc_relations_data:
|
||||
relation_pairs = doc_relations_data["relation_pairs"]
|
||||
edge_pairs_dicts = [
|
||||
{"src": pair[0], "tgt": pair[1]} for pair in relation_pairs
|
||||
]
|
||||
# get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict]
|
||||
edges_dict = await self.chunk_entity_relation_graph.get_edges_batch(
|
||||
edge_pairs_dicts
|
||||
)
|
||||
|
||||
for pair in relation_pairs:
|
||||
src, tgt = pair[0], pair[1]
|
||||
edge_key = (src, tgt)
|
||||
edge_data = edges_dict.get(edge_key)
|
||||
if edge_data:
|
||||
# Ensure compatibility with existing logic that expects "source" and "target" fields
|
||||
if "source" not in edge_data:
|
||||
edge_data["source"] = src
|
||||
if "target" not in edge_data:
|
||||
edge_data["target"] = tgt
|
||||
affected_edges.append(edge_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to analyze affected graph elements: {e}")
|
||||
raise Exception(f"Failed to analyze graph dependencies: {e}") from e
|
||||
|
||||
try:
|
||||
# Process entities
|
||||
for node_data in affected_nodes:
|
||||
node_label = node_data.get("entity_id")
|
||||
if node_label and "source_id" in node_data:
|
||||
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
entities_to_delete.add(node_label)
|
||||
elif remaining_sources != sources:
|
||||
entities_to_rebuild[node_label] = remaining_sources
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = f"Found {len(entities_to_rebuild)} affected entities"
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
# Process relationships
|
||||
for edge_data in affected_edges:
|
||||
src = edge_data.get("source")
|
||||
tgt = edge_data.get("target")
|
||||
|
||||
if src and tgt and "source_id" in edge_data:
|
||||
edge_tuple = tuple(sorted((src, tgt)))
|
||||
if (
|
||||
edge_tuple in relationships_to_delete
|
||||
or edge_tuple in relationships_to_rebuild
|
||||
):
|
||||
continue
|
||||
|
||||
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
relationships_to_delete.add(edge_tuple)
|
||||
elif remaining_sources != sources:
|
||||
relationships_to_rebuild[edge_tuple] = remaining_sources
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = (
|
||||
f"Found {len(relationships_to_rebuild)} affected relations"
|
||||
)
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process graph analysis results: {e}")
|
||||
raise Exception(f"Failed to process graph dependencies: {e}") from e
|
||||
|
||||
# Use graph database lock to prevent dirty read
|
||||
graph_db_lock = get_graph_db_lock(enable_logging=False)
|
||||
async with graph_db_lock:
|
||||
try:
|
||||
# Get affected entities and relations from full_entities and full_relations storage
|
||||
doc_entities_data = await self.full_entities.get_by_id(doc_id)
|
||||
doc_relations_data = await self.full_relations.get_by_id(doc_id)
|
||||
|
||||
affected_nodes = []
|
||||
affected_edges = []
|
||||
|
||||
# Get entity data from graph storage using entity names from full_entities
|
||||
if doc_entities_data and "entity_names" in doc_entities_data:
|
||||
entity_names = doc_entities_data["entity_names"]
|
||||
# get_nodes_batch returns dict[str, dict], need to convert to list[dict]
|
||||
nodes_dict = (
|
||||
await self.chunk_entity_relation_graph.get_nodes_batch(
|
||||
entity_names
|
||||
)
|
||||
)
|
||||
for entity_name in entity_names:
|
||||
node_data = nodes_dict.get(entity_name)
|
||||
if node_data:
|
||||
# Ensure compatibility with existing logic that expects "id" field
|
||||
if "id" not in node_data:
|
||||
node_data["id"] = entity_name
|
||||
affected_nodes.append(node_data)
|
||||
|
||||
# Get relation data from graph storage using relation pairs from full_relations
|
||||
if doc_relations_data and "relation_pairs" in doc_relations_data:
|
||||
relation_pairs = doc_relations_data["relation_pairs"]
|
||||
edge_pairs_dicts = [
|
||||
{"src": pair[0], "tgt": pair[1]} for pair in relation_pairs
|
||||
]
|
||||
# get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict]
|
||||
edges_dict = (
|
||||
await self.chunk_entity_relation_graph.get_edges_batch(
|
||||
edge_pairs_dicts
|
||||
)
|
||||
)
|
||||
|
||||
for pair in relation_pairs:
|
||||
src, tgt = pair[0], pair[1]
|
||||
edge_key = (src, tgt)
|
||||
edge_data = edges_dict.get(edge_key)
|
||||
if edge_data:
|
||||
# Ensure compatibility with existing logic that expects "source" and "target" fields
|
||||
if "source" not in edge_data:
|
||||
edge_data["source"] = src
|
||||
if "target" not in edge_data:
|
||||
edge_data["target"] = tgt
|
||||
affected_edges.append(edge_data)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to analyze affected graph elements: {e}")
|
||||
raise Exception(f"Failed to analyze graph dependencies: {e}") from e
|
||||
|
||||
try:
|
||||
# Process entities
|
||||
for node_data in affected_nodes:
|
||||
node_label = node_data.get("entity_id")
|
||||
if node_label and "source_id" in node_data:
|
||||
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
entities_to_delete.add(node_label)
|
||||
elif remaining_sources != sources:
|
||||
entities_to_rebuild[node_label] = remaining_sources
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = (
|
||||
f"Found {len(entities_to_rebuild)} affected entities"
|
||||
)
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
# Process relationships
|
||||
for edge_data in affected_edges:
|
||||
src = edge_data.get("source")
|
||||
tgt = edge_data.get("target")
|
||||
|
||||
if src and tgt and "source_id" in edge_data:
|
||||
edge_tuple = tuple(sorted((src, tgt)))
|
||||
if (
|
||||
edge_tuple in relationships_to_delete
|
||||
or edge_tuple in relationships_to_rebuild
|
||||
):
|
||||
continue
|
||||
|
||||
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
|
||||
remaining_sources = sources - chunk_ids
|
||||
|
||||
if not remaining_sources:
|
||||
relationships_to_delete.add(edge_tuple)
|
||||
elif remaining_sources != sources:
|
||||
relationships_to_rebuild[edge_tuple] = remaining_sources
|
||||
|
||||
async with pipeline_status_lock:
|
||||
log_message = (
|
||||
f"Found {len(relationships_to_rebuild)} affected relations"
|
||||
)
|
||||
logger.info(log_message)
|
||||
pipeline_status["latest_message"] = log_message
|
||||
pipeline_status["history_messages"].append(log_message)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to process graph analysis results: {e}")
|
||||
raise Exception(f"Failed to process graph dependencies: {e}") from e
|
||||
|
||||
# 5. Delete chunks from storage
|
||||
if chunk_ids:
|
||||
try:
|
||||
|
|
@ -2453,27 +2475,28 @@ class LightRAG:
|
|||
logger.error(f"Failed to delete relationships: {e}")
|
||||
raise Exception(f"Failed to delete relationships: {e}") from e
|
||||
|
||||
# 8. Rebuild entities and relationships from remaining chunks
|
||||
if entities_to_rebuild or relationships_to_rebuild:
|
||||
try:
|
||||
await _rebuild_knowledge_from_chunks(
|
||||
entities_to_rebuild=entities_to_rebuild,
|
||||
relationships_to_rebuild=relationships_to_rebuild,
|
||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||
entities_vdb=self.entities_vdb,
|
||||
relationships_vdb=self.relationships_vdb,
|
||||
text_chunks_storage=self.text_chunks,
|
||||
llm_response_cache=self.llm_response_cache,
|
||||
global_config=asdict(self),
|
||||
pipeline_status=pipeline_status,
|
||||
pipeline_status_lock=pipeline_status_lock,
|
||||
)
|
||||
# Persist changes to graph database before releasing graph database lock
|
||||
await self._insert_done()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
|
||||
raise Exception(
|
||||
f"Failed to rebuild knowledge graph: {e}"
|
||||
) from e
|
||||
# 8. Rebuild entities and relationships from remaining chunks
|
||||
if entities_to_rebuild or relationships_to_rebuild:
|
||||
try:
|
||||
await _rebuild_knowledge_from_chunks(
|
||||
entities_to_rebuild=entities_to_rebuild,
|
||||
relationships_to_rebuild=relationships_to_rebuild,
|
||||
knowledge_graph_inst=self.chunk_entity_relation_graph,
|
||||
entities_vdb=self.entities_vdb,
|
||||
relationships_vdb=self.relationships_vdb,
|
||||
text_chunks_storage=self.text_chunks,
|
||||
llm_response_cache=self.llm_response_cache,
|
||||
global_config=asdict(self),
|
||||
pipeline_status=pipeline_status,
|
||||
pipeline_status_lock=pipeline_status_lock,
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
|
||||
raise Exception(f"Failed to rebuild knowledge graph: {e}") from e
|
||||
|
||||
# 9. Delete from full_entities and full_relations storage
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -114,48 +114,197 @@ def chunking_by_token_size(
|
|||
|
||||
|
||||
async def _handle_entity_relation_summary(
|
||||
description_type: str,
|
||||
entity_or_relation_name: str,
|
||||
description: str,
|
||||
description_list: list[str],
|
||||
seperator: str,
|
||||
global_config: dict,
|
||||
llm_response_cache: BaseKVStorage | None = None,
|
||||
) -> tuple[str, bool]:
|
||||
"""Handle entity relation description summary using map-reduce approach.
|
||||
|
||||
This function summarizes a list of descriptions using a map-reduce strategy:
|
||||
1. If total tokens < summary_context_size and len(description_list) < force_llm_summary_on_merge, no need to summarize
|
||||
2. If total tokens < summary_max_tokens, summarize with LLM directly
|
||||
3. Otherwise, split descriptions into chunks that fit within token limits
|
||||
4. Summarize each chunk, then recursively process the summaries
|
||||
5. Continue until we get a final summary within token limits or num of descriptions is less than force_llm_summary_on_merge
|
||||
|
||||
Args:
|
||||
entity_or_relation_name: Name of the entity or relation being summarized
|
||||
description_list: List of description strings to summarize
|
||||
global_config: Global configuration containing tokenizer and limits
|
||||
llm_response_cache: Optional cache for LLM responses
|
||||
|
||||
Returns:
|
||||
Tuple of (final_summarized_description_string, llm_was_used_boolean)
|
||||
"""
|
||||
# Handle empty input
|
||||
if not description_list:
|
||||
return "", False
|
||||
|
||||
# If only one description, return it directly (no need for LLM call)
|
||||
if len(description_list) == 1:
|
||||
return description_list[0], False
|
||||
|
||||
# Get configuration
|
||||
tokenizer: Tokenizer = global_config["tokenizer"]
|
||||
summary_context_size = global_config["summary_context_size"]
|
||||
summary_max_tokens = global_config["summary_max_tokens"]
|
||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||
|
||||
current_list = description_list[:] # Copy the list to avoid modifying original
|
||||
llm_was_used = False # Track whether LLM was used during the entire process
|
||||
|
||||
# Iterative map-reduce process
|
||||
while True:
|
||||
# Calculate total tokens in current list
|
||||
total_tokens = sum(len(tokenizer.encode(desc)) for desc in current_list)
|
||||
|
||||
# If total length is within limits, perform final summarization
|
||||
if total_tokens <= summary_context_size or len(current_list) <= 2:
|
||||
if (
|
||||
len(current_list) < force_llm_summary_on_merge
|
||||
and total_tokens < summary_max_tokens
|
||||
):
|
||||
# no LLM needed, just join the descriptions
|
||||
final_description = seperator.join(current_list)
|
||||
return final_description if final_description else "", llm_was_used
|
||||
else:
|
||||
if total_tokens > summary_context_size and len(current_list) <= 2:
|
||||
logger.warning(
|
||||
f"Summarizing {entity_or_relation_name}: Oversize descpriton found"
|
||||
)
|
||||
# Final summarization of remaining descriptions - LLM will be used
|
||||
final_summary = await _summarize_descriptions(
|
||||
description_type,
|
||||
entity_or_relation_name,
|
||||
current_list,
|
||||
global_config,
|
||||
llm_response_cache,
|
||||
)
|
||||
return final_summary, True # LLM was used for final summarization
|
||||
|
||||
# Need to split into chunks - Map phase
|
||||
# Ensure each chunk has minimum 2 descriptions to guarantee progress
|
||||
chunks = []
|
||||
current_chunk = []
|
||||
current_tokens = 0
|
||||
|
||||
# Currently least 3 descriptions in current_list
|
||||
for i, desc in enumerate(current_list):
|
||||
desc_tokens = len(tokenizer.encode(desc))
|
||||
|
||||
# If adding current description would exceed limit, finalize current chunk
|
||||
if current_tokens + desc_tokens > summary_context_size and current_chunk:
|
||||
# Ensure we have at least 2 descriptions in the chunk (when possible)
|
||||
if len(current_chunk) == 1:
|
||||
# Force add one more description to ensure minimum 2 per chunk
|
||||
current_chunk.append(desc)
|
||||
chunks.append(current_chunk)
|
||||
logger.warning(
|
||||
f"Summarizing {entity_or_relation_name}: Oversize descpriton found"
|
||||
)
|
||||
current_chunk = [] # next group is empty
|
||||
current_tokens = 0
|
||||
else: # curren_chunk is ready for summary in reduce phase
|
||||
chunks.append(current_chunk)
|
||||
current_chunk = [desc] # leave it for next group
|
||||
current_tokens = desc_tokens
|
||||
else:
|
||||
current_chunk.append(desc)
|
||||
current_tokens += desc_tokens
|
||||
|
||||
# Add the last chunk if it exists
|
||||
if current_chunk:
|
||||
chunks.append(current_chunk)
|
||||
|
||||
logger.info(
|
||||
f" Summarizing {entity_or_relation_name}: Map {len(current_list)} descriptions into {len(chunks)} groups"
|
||||
)
|
||||
|
||||
# Reduce phase: summarize each group from chunks
|
||||
new_summaries = []
|
||||
for chunk in chunks:
|
||||
if len(chunk) == 1:
|
||||
# Optimization: single description chunks don't need LLM summarization
|
||||
new_summaries.append(chunk[0])
|
||||
else:
|
||||
# Multiple descriptions need LLM summarization
|
||||
summary = await _summarize_descriptions(
|
||||
description_type,
|
||||
entity_or_relation_name,
|
||||
chunk,
|
||||
global_config,
|
||||
llm_response_cache,
|
||||
)
|
||||
new_summaries.append(summary)
|
||||
llm_was_used = True # Mark that LLM was used in reduce phase
|
||||
|
||||
# Update current list with new summaries for next iteration
|
||||
current_list = new_summaries
|
||||
|
||||
|
||||
async def _summarize_descriptions(
|
||||
description_type: str,
|
||||
description_name: str,
|
||||
description_list: list[str],
|
||||
global_config: dict,
|
||||
llm_response_cache: BaseKVStorage | None = None,
|
||||
) -> str:
|
||||
"""Handle entity relation summary
|
||||
For each entity or relation, input is the combined description of already existing description and new description.
|
||||
If too long, use LLM to summarize.
|
||||
"""Helper function to summarize a list of descriptions using LLM.
|
||||
|
||||
Args:
|
||||
entity_or_relation_name: Name of the entity or relation being summarized
|
||||
descriptions: List of description strings to summarize
|
||||
global_config: Global configuration containing LLM function and settings
|
||||
llm_response_cache: Optional cache for LLM responses
|
||||
|
||||
Returns:
|
||||
Summarized description string
|
||||
"""
|
||||
use_llm_func: callable = global_config["llm_model_func"]
|
||||
# Apply higher priority (8) to entity/relation summary tasks
|
||||
use_llm_func = partial(use_llm_func, _priority=8)
|
||||
|
||||
tokenizer: Tokenizer = global_config["tokenizer"]
|
||||
llm_max_tokens = global_config["summary_max_tokens"]
|
||||
|
||||
language = global_config["addon_params"].get(
|
||||
"language", PROMPTS["DEFAULT_LANGUAGE"]
|
||||
)
|
||||
|
||||
tokens = tokenizer.encode(description)
|
||||
|
||||
### summarize is not determined here anymore (It's determined by num_fragment now)
|
||||
# if len(tokens) < summary_max_tokens: # No need for summary
|
||||
# return description
|
||||
summary_length_recommended = global_config["summary_length_recommended"]
|
||||
|
||||
prompt_template = PROMPTS["summarize_entity_descriptions"]
|
||||
use_description = tokenizer.decode(tokens[:llm_max_tokens])
|
||||
|
||||
# Join descriptions and apply token-based truncation if necessary
|
||||
joined_descriptions = "\n\n".join(description_list)
|
||||
tokenizer = global_config["tokenizer"]
|
||||
summary_context_size = global_config["summary_context_size"]
|
||||
|
||||
# Token-based truncation to ensure input fits within limits
|
||||
tokens = tokenizer.encode(joined_descriptions)
|
||||
if len(tokens) > summary_context_size:
|
||||
truncated_tokens = tokens[:summary_context_size]
|
||||
joined_descriptions = tokenizer.decode(truncated_tokens)
|
||||
|
||||
# Prepare context for the prompt
|
||||
context_base = dict(
|
||||
entity_name=entity_or_relation_name,
|
||||
description_list=use_description.split(GRAPH_FIELD_SEP),
|
||||
description_type=description_type,
|
||||
description_name=description_name,
|
||||
description_list=joined_descriptions,
|
||||
summary_length=summary_length_recommended,
|
||||
language=language,
|
||||
)
|
||||
use_prompt = prompt_template.format(**context_base)
|
||||
logger.debug(f"Trigger summary: {entity_or_relation_name}")
|
||||
|
||||
logger.debug(
|
||||
f"Summarizing {len(description_list)} descriptions for: {description_name}"
|
||||
)
|
||||
|
||||
# Use LLM function with cache (higher priority for summary generation)
|
||||
summary = await use_llm_func_with_cache(
|
||||
use_prompt,
|
||||
use_llm_func,
|
||||
llm_response_cache=llm_response_cache,
|
||||
# max_tokens=summary_max_tokens,
|
||||
cache_type="extract",
|
||||
)
|
||||
return summary
|
||||
|
|
@ -413,7 +562,7 @@ async def _rebuild_knowledge_from_chunks(
|
|||
)
|
||||
rebuilt_entities_count += 1
|
||||
status_message = (
|
||||
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
|
||||
f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks"
|
||||
)
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
|
|
@ -422,7 +571,7 @@ async def _rebuild_knowledge_from_chunks(
|
|||
pipeline_status["history_messages"].append(status_message)
|
||||
except Exception as e:
|
||||
failed_entities_count += 1
|
||||
status_message = f"Failed to rebuild entity {entity_name}: {e}"
|
||||
status_message = f"Failed to rebuild `{entity_name}`: {e}"
|
||||
logger.info(status_message) # Per requirement, change to info
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
|
|
@ -453,7 +602,9 @@ async def _rebuild_knowledge_from_chunks(
|
|||
global_config=global_config,
|
||||
)
|
||||
rebuilt_relationships_count += 1
|
||||
status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
|
||||
status_message = (
|
||||
f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks"
|
||||
)
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
|
|
@ -461,7 +612,7 @@ async def _rebuild_knowledge_from_chunks(
|
|||
pipeline_status["history_messages"].append(status_message)
|
||||
except Exception as e:
|
||||
failed_relationships_count += 1
|
||||
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
|
||||
status_message = f"Failed to rebuild `{src} - {tgt}`: {e}"
|
||||
logger.info(status_message) # Per requirement, change to info
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
|
|
@ -525,14 +676,20 @@ async def _get_cached_extraction_results(
|
|||
) -> dict[str, list[str]]:
|
||||
"""Get cached extraction results for specific chunk IDs
|
||||
|
||||
This function retrieves cached LLM extraction results for the given chunk IDs and returns
|
||||
them sorted by creation time. The results are sorted at two levels:
|
||||
1. Individual extraction results within each chunk are sorted by create_time (earliest first)
|
||||
2. Chunks themselves are sorted by the create_time of their earliest extraction result
|
||||
|
||||
Args:
|
||||
llm_response_cache: LLM response cache storage
|
||||
chunk_ids: Set of chunk IDs to get cached results for
|
||||
text_chunks_data: Pre-loaded chunk data (optional, for performance)
|
||||
text_chunks_storage: Text chunks storage (fallback if text_chunks_data is None)
|
||||
text_chunks_storage: Text chunks storage for retrieving chunk data and LLM cache references
|
||||
|
||||
Returns:
|
||||
Dict mapping chunk_id -> list of extraction_result_text
|
||||
Dict mapping chunk_id -> list of extraction_result_text, where:
|
||||
- Keys (chunk_ids) are ordered by the create_time of their first extraction result
|
||||
- Values (extraction results) are ordered by create_time within each chunk
|
||||
"""
|
||||
cached_results = {}
|
||||
|
||||
|
|
@ -541,15 +698,13 @@ async def _get_cached_extraction_results(
|
|||
|
||||
# Read from storage
|
||||
chunk_data_list = await text_chunks_storage.get_by_ids(list(chunk_ids))
|
||||
for chunk_id, chunk_data in zip(chunk_ids, chunk_data_list):
|
||||
for chunk_data in chunk_data_list:
|
||||
if chunk_data and isinstance(chunk_data, dict):
|
||||
llm_cache_list = chunk_data.get("llm_cache_list", [])
|
||||
if llm_cache_list:
|
||||
all_cache_ids.update(llm_cache_list)
|
||||
else:
|
||||
logger.warning(
|
||||
f"Chunk {chunk_id} data is invalid or None: {type(chunk_data)}"
|
||||
)
|
||||
logger.warning(f"Chunk data is invalid or None: {chunk_data}")
|
||||
|
||||
if not all_cache_ids:
|
||||
logger.warning(f"No LLM cache IDs found for {len(chunk_ids)} chunk IDs")
|
||||
|
|
@ -560,7 +715,7 @@ async def _get_cached_extraction_results(
|
|||
|
||||
# Process cache entries and group by chunk_id
|
||||
valid_entries = 0
|
||||
for cache_id, cache_entry in zip(all_cache_ids, cache_data_list):
|
||||
for cache_entry in cache_data_list:
|
||||
if (
|
||||
cache_entry is not None
|
||||
and isinstance(cache_entry, dict)
|
||||
|
|
@ -580,16 +735,30 @@ async def _get_cached_extraction_results(
|
|||
# Store tuple with extraction result and creation time for sorting
|
||||
cached_results[chunk_id].append((extraction_result, create_time))
|
||||
|
||||
# Sort extraction results by create_time for each chunk
|
||||
# Sort extraction results by create_time for each chunk and collect earliest times
|
||||
chunk_earliest_times = {}
|
||||
for chunk_id in cached_results:
|
||||
# Sort by create_time (x[1]), then extract only extraction_result (x[0])
|
||||
cached_results[chunk_id].sort(key=lambda x: x[1])
|
||||
# Store the earliest create_time for this chunk (first item after sorting)
|
||||
chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1]
|
||||
# Extract only extraction_result (x[0])
|
||||
cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]]
|
||||
|
||||
logger.info(
|
||||
f"Found {valid_entries} valid cache entries, {len(cached_results)} chunks with results"
|
||||
# Sort cached_results by the earliest create_time of each chunk
|
||||
sorted_chunk_ids = sorted(
|
||||
chunk_earliest_times.keys(), key=lambda chunk_id: chunk_earliest_times[chunk_id]
|
||||
)
|
||||
return cached_results
|
||||
|
||||
# Rebuild cached_results in sorted order
|
||||
sorted_cached_results = {}
|
||||
for chunk_id in sorted_chunk_ids:
|
||||
sorted_cached_results[chunk_id] = cached_results[chunk_id]
|
||||
|
||||
logger.info(
|
||||
f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results"
|
||||
)
|
||||
return sorted_cached_results
|
||||
|
||||
|
||||
async def _parse_extraction_result(
|
||||
|
|
@ -690,15 +859,6 @@ async def _rebuild_single_entity(
|
|||
# Update entity in vector database
|
||||
entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-")
|
||||
|
||||
# Delete old vector record first
|
||||
try:
|
||||
await entities_vdb.delete([entity_vdb_id])
|
||||
except Exception as e:
|
||||
logger.debug(
|
||||
f"Could not delete old entity vector record {entity_vdb_id}: {e}"
|
||||
)
|
||||
|
||||
# Insert new vector record
|
||||
entity_content = f"{entity_name}\n{final_description}"
|
||||
await entities_vdb.upsert(
|
||||
{
|
||||
|
|
@ -713,21 +873,6 @@ async def _rebuild_single_entity(
|
|||
}
|
||||
)
|
||||
|
||||
# Helper function to generate final description with optional LLM summary
|
||||
async def _generate_final_description(combined_description: str) -> str:
|
||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
|
||||
|
||||
if num_fragment >= force_llm_summary_on_merge:
|
||||
return await _handle_entity_relation_summary(
|
||||
entity_name,
|
||||
combined_description,
|
||||
global_config,
|
||||
llm_response_cache=llm_response_cache,
|
||||
)
|
||||
else:
|
||||
return combined_description
|
||||
|
||||
# Collect all entity data from relevant chunks
|
||||
all_entity_data = []
|
||||
for chunk_id in chunk_ids:
|
||||
|
|
@ -736,13 +881,13 @@ async def _rebuild_single_entity(
|
|||
|
||||
if not all_entity_data:
|
||||
logger.warning(
|
||||
f"No cached entity data found for {entity_name}, trying to rebuild from relationships"
|
||||
f"No entity data found for `{entity_name}`, trying to rebuild from relationships"
|
||||
)
|
||||
|
||||
# Get all edges connected to this entity
|
||||
edges = await knowledge_graph_inst.get_node_edges(entity_name)
|
||||
if not edges:
|
||||
logger.warning(f"No relationships found for entity {entity_name}")
|
||||
logger.warning(f"No relations attached to entity `{entity_name}`")
|
||||
return
|
||||
|
||||
# Collect relationship data to extract entity information
|
||||
|
|
@ -760,10 +905,19 @@ async def _rebuild_single_entity(
|
|||
edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP)
|
||||
file_paths.update(edge_file_paths)
|
||||
|
||||
# Generate description from relationships or fallback to current
|
||||
if relationship_descriptions:
|
||||
combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions)
|
||||
final_description = await _generate_final_description(combined_description)
|
||||
# deduplicate descriptions
|
||||
description_list = list(dict.fromkeys(relationship_descriptions))
|
||||
|
||||
# Generate final description from relationships or fallback to current
|
||||
if description_list:
|
||||
final_description, _ = await _handle_entity_relation_summary(
|
||||
"Entity",
|
||||
entity_name,
|
||||
description_list,
|
||||
GRAPH_FIELD_SEP,
|
||||
global_config,
|
||||
llm_response_cache=llm_response_cache,
|
||||
)
|
||||
else:
|
||||
final_description = current_entity.get("description", "")
|
||||
|
||||
|
|
@ -784,12 +938,9 @@ async def _rebuild_single_entity(
|
|||
if entity_data.get("file_path"):
|
||||
file_paths.add(entity_data["file_path"])
|
||||
|
||||
# Combine all descriptions
|
||||
combined_description = (
|
||||
GRAPH_FIELD_SEP.join(descriptions)
|
||||
if descriptions
|
||||
else current_entity.get("description", "")
|
||||
)
|
||||
# Remove duplicates while preserving order
|
||||
description_list = list(dict.fromkeys(descriptions))
|
||||
entity_types = list(dict.fromkeys(entity_types))
|
||||
|
||||
# Get most common entity type
|
||||
entity_type = (
|
||||
|
|
@ -798,8 +949,19 @@ async def _rebuild_single_entity(
|
|||
else current_entity.get("entity_type", "UNKNOWN")
|
||||
)
|
||||
|
||||
# Generate final description and update storage
|
||||
final_description = await _generate_final_description(combined_description)
|
||||
# Generate final description from entities or fallback to current
|
||||
if description_list:
|
||||
final_description, _ = await _handle_entity_relation_summary(
|
||||
"Entity",
|
||||
entity_name,
|
||||
description_list,
|
||||
GRAPH_FIELD_SEP,
|
||||
global_config,
|
||||
llm_response_cache=llm_response_cache,
|
||||
)
|
||||
else:
|
||||
final_description = current_entity.get("description", "")
|
||||
|
||||
await _update_entity_storage(final_description, entity_type, file_paths)
|
||||
|
||||
|
||||
|
|
@ -836,7 +998,7 @@ async def _rebuild_single_relationship(
|
|||
)
|
||||
|
||||
if not all_relationship_data:
|
||||
logger.warning(f"No cached relationship data found for {src}-{tgt}")
|
||||
logger.warning(f"No relation data found for `{src}-{tgt}`")
|
||||
return
|
||||
|
||||
# Merge descriptions and keywords
|
||||
|
|
@ -855,42 +1017,38 @@ async def _rebuild_single_relationship(
|
|||
if rel_data.get("file_path"):
|
||||
file_paths.add(rel_data["file_path"])
|
||||
|
||||
# Combine descriptions and keywords
|
||||
combined_description = (
|
||||
GRAPH_FIELD_SEP.join(descriptions)
|
||||
if descriptions
|
||||
else current_relationship.get("description", "")
|
||||
)
|
||||
# Remove duplicates while preserving order
|
||||
description_list = list(dict.fromkeys(descriptions))
|
||||
keywords = list(dict.fromkeys(keywords))
|
||||
|
||||
combined_keywords = (
|
||||
", ".join(set(keywords))
|
||||
if keywords
|
||||
else current_relationship.get("keywords", "")
|
||||
)
|
||||
# weight = (
|
||||
# sum(weights) / len(weights)
|
||||
# if weights
|
||||
# else current_relationship.get("weight", 1.0)
|
||||
# )
|
||||
|
||||
weight = sum(weights) if weights else current_relationship.get("weight", 1.0)
|
||||
|
||||
# Use summary if description has too many fragments
|
||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
|
||||
|
||||
if num_fragment >= force_llm_summary_on_merge:
|
||||
final_description = await _handle_entity_relation_summary(
|
||||
# Generate final description from relations or fallback to current
|
||||
if description_list:
|
||||
final_description, _ = await _handle_entity_relation_summary(
|
||||
"Relation",
|
||||
f"{src}-{tgt}",
|
||||
combined_description,
|
||||
description_list,
|
||||
GRAPH_FIELD_SEP,
|
||||
global_config,
|
||||
llm_response_cache=llm_response_cache,
|
||||
)
|
||||
else:
|
||||
final_description = combined_description
|
||||
# fallback to keep current(unchanged)
|
||||
final_description = current_relationship.get("description", "")
|
||||
|
||||
# Update relationship in graph storage
|
||||
updated_relationship_data = {
|
||||
**current_relationship,
|
||||
"description": final_description,
|
||||
"description": final_description
|
||||
if final_description
|
||||
else current_relationship.get("description", ""),
|
||||
"keywords": combined_keywords,
|
||||
"weight": weight,
|
||||
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
|
||||
|
|
@ -948,13 +1106,9 @@ async def _merge_nodes_then_upsert(
|
|||
already_node = await knowledge_graph_inst.get_node(entity_name)
|
||||
if already_node:
|
||||
already_entity_types.append(already_node["entity_type"])
|
||||
already_source_ids.extend(
|
||||
split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
|
||||
)
|
||||
already_file_paths.extend(
|
||||
split_string_by_multi_markers(already_node["file_path"], [GRAPH_FIELD_SEP])
|
||||
)
|
||||
already_description.append(already_node["description"])
|
||||
already_source_ids.extend(already_node["source_id"].split(GRAPH_FIELD_SEP))
|
||||
already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP))
|
||||
already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP))
|
||||
|
||||
entity_type = sorted(
|
||||
Counter(
|
||||
|
|
@ -962,42 +1116,54 @@ async def _merge_nodes_then_upsert(
|
|||
).items(),
|
||||
key=lambda x: x[1],
|
||||
reverse=True,
|
||||
)[0][0]
|
||||
description = GRAPH_FIELD_SEP.join(
|
||||
sorted(set([dp["description"] for dp in nodes_data] + already_description))
|
||||
)[0][0] # Get the entity type with the highest count
|
||||
|
||||
# merge and deduplicate description
|
||||
description_list = list(
|
||||
dict.fromkeys(
|
||||
already_description
|
||||
+ [dp["description"] for dp in nodes_data if dp.get("description")]
|
||||
)
|
||||
)
|
||||
|
||||
num_fragment = len(description_list)
|
||||
already_fragment = len(already_description)
|
||||
deduplicated_num = already_fragment + len(nodes_data) - num_fragment
|
||||
if deduplicated_num > 0:
|
||||
dd_message = f"(dd:{deduplicated_num})"
|
||||
else:
|
||||
dd_message = ""
|
||||
if num_fragment > 0:
|
||||
# Get summary and LLM usage status
|
||||
description, llm_was_used = await _handle_entity_relation_summary(
|
||||
"Entity",
|
||||
entity_name,
|
||||
description_list,
|
||||
GRAPH_FIELD_SEP,
|
||||
global_config,
|
||||
llm_response_cache,
|
||||
)
|
||||
|
||||
# Log based on actual LLM usage
|
||||
if llm_was_used:
|
||||
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
|
||||
else:
|
||||
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
|
||||
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
else:
|
||||
logger.error(f"Entity {entity_name} has no description")
|
||||
description = "(no description)"
|
||||
|
||||
source_id = GRAPH_FIELD_SEP.join(
|
||||
set([dp["source_id"] for dp in nodes_data] + already_source_ids)
|
||||
)
|
||||
file_path = build_file_path(already_file_paths, nodes_data, entity_name)
|
||||
|
||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||
|
||||
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
|
||||
num_new_fragment = len(set([dp["description"] for dp in nodes_data]))
|
||||
|
||||
if num_fragment > 1:
|
||||
if num_fragment >= force_llm_summary_on_merge:
|
||||
status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
description = await _handle_entity_relation_summary(
|
||||
entity_name,
|
||||
description,
|
||||
global_config,
|
||||
llm_response_cache,
|
||||
)
|
||||
else:
|
||||
status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
|
||||
node_data = dict(
|
||||
entity_id=entity_name,
|
||||
entity_type=entity_type,
|
||||
|
|
@ -1044,22 +1210,20 @@ async def _merge_edges_then_upsert(
|
|||
# Get source_id with empty string default if missing or None
|
||||
if already_edge.get("source_id") is not None:
|
||||
already_source_ids.extend(
|
||||
split_string_by_multi_markers(
|
||||
already_edge["source_id"], [GRAPH_FIELD_SEP]
|
||||
)
|
||||
already_edge["source_id"].split(GRAPH_FIELD_SEP)
|
||||
)
|
||||
|
||||
# Get file_path with empty string default if missing or None
|
||||
if already_edge.get("file_path") is not None:
|
||||
already_file_paths.extend(
|
||||
split_string_by_multi_markers(
|
||||
already_edge["file_path"], [GRAPH_FIELD_SEP]
|
||||
)
|
||||
already_edge["file_path"].split(GRAPH_FIELD_SEP)
|
||||
)
|
||||
|
||||
# Get description with empty string default if missing or None
|
||||
if already_edge.get("description") is not None:
|
||||
already_description.append(already_edge["description"])
|
||||
already_description.extend(
|
||||
already_edge["description"].split(GRAPH_FIELD_SEP)
|
||||
)
|
||||
|
||||
# Get keywords with empty string default if missing or None
|
||||
if already_edge.get("keywords") is not None:
|
||||
|
|
@ -1071,15 +1235,47 @@ async def _merge_edges_then_upsert(
|
|||
|
||||
# Process edges_data with None checks
|
||||
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
|
||||
description = GRAPH_FIELD_SEP.join(
|
||||
sorted(
|
||||
set(
|
||||
[dp["description"] for dp in edges_data if dp.get("description")]
|
||||
+ already_description
|
||||
)
|
||||
|
||||
description_list = list(
|
||||
dict.fromkeys(
|
||||
already_description
|
||||
+ [dp["description"] for dp in edges_data if dp.get("description")]
|
||||
)
|
||||
)
|
||||
|
||||
num_fragment = len(description_list)
|
||||
already_fragment = len(already_description)
|
||||
deduplicated_num = already_fragment + len(edges_data) - num_fragment
|
||||
if deduplicated_num > 0:
|
||||
dd_message = f"(dd:{deduplicated_num})"
|
||||
else:
|
||||
dd_message = ""
|
||||
if num_fragment > 0:
|
||||
# Get summary and LLM usage status
|
||||
description, llm_was_used = await _handle_entity_relation_summary(
|
||||
"Relation",
|
||||
f"({src_id}, {tgt_id})",
|
||||
description_list,
|
||||
GRAPH_FIELD_SEP,
|
||||
global_config,
|
||||
llm_response_cache,
|
||||
)
|
||||
|
||||
# Log based on actual LLM usage
|
||||
if llm_was_used:
|
||||
status_message = f"LLMmrg: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
|
||||
else:
|
||||
status_message = f"Merged: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
|
||||
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
else:
|
||||
logger.error(f"Edge {src_id} - {tgt_id} has no description")
|
||||
description = "(no description)"
|
||||
|
||||
# Split all existing and new keywords into individual terms, then combine and deduplicate
|
||||
all_keywords = set()
|
||||
# Process already_keywords (which are comma-separated)
|
||||
|
|
@ -1127,35 +1323,6 @@ async def _merge_edges_then_upsert(
|
|||
}
|
||||
added_entities.append(entity_data)
|
||||
|
||||
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
|
||||
|
||||
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
|
||||
num_new_fragment = len(
|
||||
set([dp["description"] for dp in edges_data if dp.get("description")])
|
||||
)
|
||||
|
||||
if num_fragment > 1:
|
||||
if num_fragment >= force_llm_summary_on_merge:
|
||||
status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
description = await _handle_entity_relation_summary(
|
||||
f"({src_id}, {tgt_id})",
|
||||
description,
|
||||
global_config,
|
||||
llm_response_cache,
|
||||
)
|
||||
else:
|
||||
status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
|
||||
logger.info(status_message)
|
||||
if pipeline_status is not None and pipeline_status_lock is not None:
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = status_message
|
||||
pipeline_status["history_messages"].append(status_message)
|
||||
|
||||
await knowledge_graph_inst.upsert_edge(
|
||||
src_id,
|
||||
tgt_id,
|
||||
|
|
@ -1463,7 +1630,7 @@ async def merge_nodes_and_edges(
|
|||
)
|
||||
# Don't raise exception to avoid affecting main flow
|
||||
|
||||
log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} added entities, {len(processed_edges)} relations"
|
||||
log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} extra entities, {len(processed_edges)} relations"
|
||||
logger.info(log_message)
|
||||
async with pipeline_status_lock:
|
||||
pipeline_status["latest_message"] = log_message
|
||||
|
|
|
|||
|
|
@ -40,22 +40,19 @@ Format the content-level key words as ("content_keywords"{tuple_delimiter}<high_
|
|||
|
||||
5. When finished, output {completion_delimiter}
|
||||
|
||||
######################
|
||||
---Examples---
|
||||
######################
|
||||
{examples}
|
||||
|
||||
#############################
|
||||
---Real Data---
|
||||
######################
|
||||
Entity_types: [{entity_types}]
|
||||
Text:
|
||||
{input_text}
|
||||
######################
|
||||
|
||||
---Output---
|
||||
Output:"""
|
||||
|
||||
PROMPTS["entity_extraction_examples"] = [
|
||||
"""Example 1:
|
||||
"""------Example 1------
|
||||
|
||||
Entity_types: [person, technology, mission, organization, location]
|
||||
Text:
|
||||
|
|
@ -81,8 +78,9 @@ Output:
|
|||
("relationship"{tuple_delimiter}"Jordan"{tuple_delimiter}"Cruz"{tuple_delimiter}"Jordan's commitment to discovery is in rebellion against Cruz's vision of control and order."{tuple_delimiter}"ideological conflict, rebellion"{tuple_delimiter}5){record_delimiter}
|
||||
("relationship"{tuple_delimiter}"Taylor"{tuple_delimiter}"The Device"{tuple_delimiter}"Taylor shows reverence towards the device, indicating its importance and potential impact."{tuple_delimiter}"reverence, technological significance"{tuple_delimiter}9){record_delimiter}
|
||||
("content_keywords"{tuple_delimiter}"power dynamics, ideological conflict, discovery, rebellion"){completion_delimiter}
|
||||
#############################""",
|
||||
"""Example 2:
|
||||
|
||||
""",
|
||||
"""------Example 2------
|
||||
|
||||
Entity_types: [company, index, commodity, market_trend, economic_policy, biological]
|
||||
Text:
|
||||
|
|
@ -109,8 +107,9 @@ Output:
|
|||
("relationship"{tuple_delimiter}"Gold Futures"{tuple_delimiter}"Market Selloff"{tuple_delimiter}"Gold prices rose as investors sought safe-haven assets during the market selloff."{tuple_delimiter}"market reaction, safe-haven investment"{tuple_delimiter}10){record_delimiter}
|
||||
("relationship"{tuple_delimiter}"Federal Reserve Policy Announcement"{tuple_delimiter}"Market Selloff"{tuple_delimiter}"Speculation over Federal Reserve policy changes contributed to market volatility and investor selloff."{tuple_delimiter}"interest rate impact, financial regulation"{tuple_delimiter}7){record_delimiter}
|
||||
("content_keywords"{tuple_delimiter}"market downturn, investor sentiment, commodities, Federal Reserve, stock performance"){completion_delimiter}
|
||||
#############################""",
|
||||
"""Example 3:
|
||||
|
||||
""",
|
||||
"""------Example 3------
|
||||
|
||||
Entity_types: [economic_policy, athlete, event, location, record, organization, equipment]
|
||||
Text:
|
||||
|
|
@ -130,23 +129,29 @@ Output:
|
|||
("relationship"{tuple_delimiter}"Noah Carter"{tuple_delimiter}"Carbon-Fiber Spikes"{tuple_delimiter}"Noah Carter used carbon-fiber spikes to enhance performance during the race."{tuple_delimiter}"athletic equipment, performance boost"{tuple_delimiter}7){record_delimiter}
|
||||
("relationship"{tuple_delimiter}"World Athletics Federation"{tuple_delimiter}"100m Sprint Record"{tuple_delimiter}"The World Athletics Federation is responsible for validating and recognizing new sprint records."{tuple_delimiter}"sports regulation, record certification"{tuple_delimiter}9){record_delimiter}
|
||||
("content_keywords"{tuple_delimiter}"athletics, sprinting, record-breaking, sports technology, competition"){completion_delimiter}
|
||||
#############################""",
|
||||
|
||||
""",
|
||||
]
|
||||
|
||||
PROMPTS[
|
||||
"summarize_entity_descriptions"
|
||||
] = """You are a helpful assistant responsible for generating a comprehensive summary of the data provided below.
|
||||
Given one or two entities, and a list of descriptions, all related to the same entity or group of entities.
|
||||
Please concatenate all of these into a single, comprehensive description. Make sure to include information collected from all the descriptions.
|
||||
If the provided descriptions are contradictory, please resolve the contradictions and provide a single, coherent summary.
|
||||
Make sure it is written in third person, and include the entity names so we the have full context.
|
||||
Use {language} as output language.
|
||||
PROMPTS["summarize_entity_descriptions"] = """---Role---
|
||||
You are a Knowledge Graph Specialist responsible for data curation and synthesis.
|
||||
|
||||
---Task---
|
||||
Your task is to synthesize a list of descriptions of a given entity or relation into a single, comprehensive, and cohesive summary.
|
||||
|
||||
---Instructions---
|
||||
1. **Comprehensiveness:** The summary must integrate key information from all provided descriptions. Do not omit important facts.
|
||||
2. **Context:** The summary must explicitly mention the name of the entity or relation for full context.
|
||||
3. **Style:** The output must be written from an objective, third-person perspective.
|
||||
4. **Length:** Maintain depth and completeness while ensuring the summary's length not exceed {summary_length} tokens.
|
||||
5. **Language:** The entire output must be written in {language}.
|
||||
|
||||
#######
|
||||
---Data---
|
||||
Entities: {entity_name}
|
||||
Description List: {description_list}
|
||||
#######
|
||||
{description_type} Name: {description_name}
|
||||
Description List:
|
||||
{description_list}
|
||||
|
||||
---Output---
|
||||
Output:
|
||||
"""
|
||||
|
||||
|
|
@ -188,8 +193,7 @@ PROMPTS["entity_if_loop_extraction"] = """
|
|||
It appears some entities may have still been missed.
|
||||
|
||||
---Output---
|
||||
|
||||
Answer ONLY by `YES` OR `NO` if there are still entities that need to be added.
|
||||
Output:
|
||||
""".strip()
|
||||
|
||||
PROMPTS["fail_response"] = (
|
||||
|
|
@ -211,7 +215,7 @@ Generate a concise response based on Knowledge Base and follow Response Rules, c
|
|||
---Knowledge Graph and Document Chunks---
|
||||
{context_data}
|
||||
|
||||
---RESPONSE GUIDELINES---
|
||||
---Response Guidelines---
|
||||
**1. Content & Adherence:**
|
||||
- Strictly adhere to the provided context from the Knowledge Base. Do not invent, assume, or include any information not present in the source data.
|
||||
- If the answer cannot be found in the provided context, state that you do not have enough information to answer.
|
||||
|
|
@ -233,8 +237,8 @@ Generate a concise response based on Knowledge Base and follow Response Rules, c
|
|||
---USER CONTEXT---
|
||||
- Additional user prompt: {user_prompt}
|
||||
|
||||
|
||||
Response:"""
|
||||
---Response---
|
||||
Output:"""
|
||||
|
||||
PROMPTS["keywords_extraction"] = """---Role---
|
||||
You are an expert keyword extractor, specializing in analyzing user queries for a Retrieval-Augmented Generation (RAG) system. Your purpose is to identify both high-level and low-level keywords in the user's query that will be used for effective document retrieval.
|
||||
|
|
@ -257,7 +261,7 @@ Given a user query, your task is to extract two distinct types of keywords:
|
|||
User Query: {query}
|
||||
|
||||
---Output---
|
||||
"""
|
||||
Output:"""
|
||||
|
||||
PROMPTS["keywords_extraction_examples"] = [
|
||||
"""Example 1:
|
||||
|
|
@ -327,5 +331,5 @@ Generate a concise response based on Document Chunks and follow Response Rules,
|
|||
---USER CONTEXT---
|
||||
- Additional user prompt: {user_prompt}
|
||||
|
||||
|
||||
Response:"""
|
||||
---Response---
|
||||
Output:"""
|
||||
|
|
|
|||
|
|
@ -35,7 +35,6 @@ export type LightragStatus = {
|
|||
embedding_binding: string
|
||||
embedding_binding_host: string
|
||||
embedding_model: string
|
||||
max_tokens: number
|
||||
kv_storage: string
|
||||
doc_status_storage: string
|
||||
graph_storage: string
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue