Merge branch 'main' into entityTypesServerSupport

This commit is contained in:
Thibo Rosemplatt 2025-08-26 21:48:20 +02:00
commit c3aabfc251
19 changed files with 703 additions and 462 deletions

View file

@ -268,7 +268,8 @@ if __name__ == "__main__":
| **embedding_func_max_async** | `int` | 最大并发异步嵌入进程数 | `16` |
| **llm_model_func** | `callable` | LLM生成的函数 | `gpt_4o_mini_complete` |
| **llm_model_name** | `str` | 用于生成的LLM模型名称 | `meta-llama/Llama-3.2-1B-Instruct` |
| **summary_max_tokens** | `int` | 生成实体关系摘要时送给LLM的最大令牌数 | `30000`(由环境变量 SUMMARY_MAX_TOKENS 设置) |
| **summary_context_size** | `int` | 合并实体关系摘要时送给LLM的最大令牌数 | `10000`(由环境变量 SUMMARY_MAX_CONTEXT 设置) |
| **summary_max_tokens** | `int` | 合并实体关系描述的最大令牌数长度 | `500`(由环境变量 SUMMARY_MAX_TOKENS 设置) |
| **llm_model_max_async** | `int` | 最大并发异步LLM进程数 | `4`默认值由环境变量MAX_ASYNC更改 |
| **llm_model_kwargs** | `dict` | LLM生成的附加参数 | |
| **vector_db_storage_cls_kwargs** | `dict` | 向量数据库的附加参数,如设置节点和关系检索的阈值 | cosine_better_than_threshold: 0.2默认值由环境变量COSINE_THRESHOLD更改 |
@ -598,9 +599,9 @@ if __name__ == "__main__":
为了提高检索质量,可以根据更有效的相关性评分模型对文档进行重排序。`rerank.py`文件提供了三个Reranker提供商的驱动函数
* **Cohere / vLLM**: `cohere_rerank`
* **Jina AI**: `jina_rerank`
* **Aliyun阿里云**: `ali_rerank`
* **Cohere / vLLM**: `cohere_rerank`
* **Jina AI**: `jina_rerank`
* **Aliyun阿里云**: `ali_rerank`
您可以将这些函数之一注入到LightRAG对象的`rerank_model_func`属性中。这将使LightRAG的查询功能能够使用注入的函数对检索到的文本块进行重新排序。有关详细用法请参阅`examples/rerank_example.py`文件。

View file

@ -275,7 +275,8 @@ A full list of LightRAG init parameters:
| **embedding_func_max_async** | `int` | Maximum number of concurrent asynchronous embedding processes | `16` |
| **llm_model_func** | `callable` | Function for LLM generation | `gpt_4o_mini_complete` |
| **llm_model_name** | `str` | LLM model name for generation | `meta-llama/Llama-3.2-1B-Instruct` |
| **summary_max_tokens** | `int` | Maximum tokens send to LLM to generate entity relation summaries | `30000`configured by env var SUMMARY_MAX_TOKENS) |
| **summary_context_size** | `int` | Maximum tokens send to LLM to generate summaries for entity relation merging | `10000`configured by env var SUMMARY_CONTEXT_SIZE) |
| **summary_max_tokens** | `int` | Maximum token size for entity/relation description | `500`configured by env var SUMMARY_MAX_TOKENS) |
| **llm_model_max_async** | `int` | Maximum number of concurrent asynchronous LLM processes | `4`default value changed by env var MAX_ASYNC) |
| **llm_model_kwargs** | `dict` | Additional parameters for LLM generation | |
| **vector_db_storage_cls_kwargs** | `dict` | Additional parameters for vector database, like setting the threshold for nodes and relations retrieval | cosine_better_than_threshold: 0.2default value changed by env var COSINE_THRESHOLD) |

View file

@ -125,12 +125,15 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true
### Chunk size for document splitting, 500~1500 is recommended
# CHUNK_SIZE=1200
# CHUNK_OVERLAP_SIZE=100
### Entity and relation summarization configuration
### Number of duplicated entities/edges to trigger LLM re-summary on merge (at least 3 is recommented) and max tokens send to LLM
# FORCE_LLM_SUMMARY_ON_MERGE=4
# SUMMARY_MAX_TOKENS=30000
### Maximum number of entity extraction attempts for ambiguous content
# MAX_GLEANING=1
### Number of summary semgments or tokens to trigger LLM summary on entity/relation merge (at least 3 is recommented)
# FORCE_LLM_SUMMARY_ON_MERGE=8
### Max description token size to trigger LLM summary
# SUMMARY_MAX_TOKENS = 1200
### Recommended LLM summary output length in tokens
# SUMMARY_LENGTH_RECOMMENDED_=600
### Maximum context size sent to LLM for description summary
# SUMMARY_CONTEXT_SIZE=12000
### Customize the entities that the LLM will attempt to recognize
# ENTITY_TYPES=["person", "organization", "location", "event", "concept"]

View file

@ -1 +1 @@
__api_version__ = "0207"
__api_version__ = "0208"

View file

@ -30,6 +30,8 @@ from lightrag.constants import (
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
DEFAULT_MAX_ASYNC,
DEFAULT_SUMMARY_MAX_TOKENS,
DEFAULT_SUMMARY_LENGTH_RECOMMENDED,
DEFAULT_SUMMARY_CONTEXT_SIZE,
DEFAULT_SUMMARY_LANGUAGE,
DEFAULT_EMBEDDING_FUNC_MAX_ASYNC,
DEFAULT_EMBEDDING_BATCH_NUM,
@ -120,10 +122,26 @@ def parse_args() -> argparse.Namespace:
help=f"Maximum async operations (default: from env or {DEFAULT_MAX_ASYNC})",
)
parser.add_argument(
"--max-tokens",
"--summary-max-tokens",
type=int,
default=get_env_value("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS, int),
help=f"Maximum token size (default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})",
help=f"Maximum token size for entity/relation summary(default: from env or {DEFAULT_SUMMARY_MAX_TOKENS})",
)
parser.add_argument(
"--summary-context-size",
type=int,
default=get_env_value(
"SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE, int
),
help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_CONTEXT_SIZE})",
)
parser.add_argument(
"--summary-length-recommended",
type=int,
default=get_env_value(
"SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED, int
),
help=f"LLM Summary Context size (default: from env or {DEFAULT_SUMMARY_LENGTH_RECOMMENDED})",
)
# Logging configuration

View file

@ -2,7 +2,7 @@
LightRAG FastAPI Server
"""
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi import FastAPI, Depends, HTTPException
import asyncio
import os
import logging
@ -248,6 +248,7 @@ def create_app(args):
azure_openai_complete_if_cache,
azure_openai_embed,
)
from lightrag.llm.binding_options import OpenAILLMOptions
if args.llm_binding == "aws_bedrock" or args.embedding_binding == "aws_bedrock":
from lightrag.llm.bedrock import bedrock_complete_if_cache, bedrock_embed
if args.embedding_binding == "ollama":
@ -471,7 +472,8 @@ def create_app(args):
),
llm_model_name=args.llm_model,
llm_model_max_async=args.max_async,
summary_max_tokens=args.max_tokens,
summary_max_tokens=args.summary_max_tokens,
summary_context_size=args.summary_context_size,
chunk_token_size=int(args.chunk_size),
chunk_overlap_token_size=int(args.chunk_overlap_size),
llm_model_kwargs=(
@ -509,7 +511,8 @@ def create_app(args):
chunk_overlap_token_size=int(args.chunk_overlap_size),
llm_model_name=args.llm_model,
llm_model_max_async=args.max_async,
summary_max_tokens=args.max_tokens,
summary_max_tokens=args.summary_max_tokens,
summary_context_size=args.summary_context_size,
embedding_func=embedding_func,
kv_storage=args.kv_storage,
graph_storage=args.graph_storage,
@ -596,9 +599,7 @@ def create_app(args):
}
username = form_data.username
if auth_handler.accounts.get(username) != form_data.password:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect credentials"
)
raise HTTPException(status_code=401, detail="Incorrect credentials")
# Regular user login
user_token = auth_handler.create_token(
@ -641,7 +642,8 @@ def create_app(args):
"embedding_binding": args.embedding_binding,
"embedding_binding_host": args.embedding_binding_host,
"embedding_model": args.embedding_model,
"max_tokens": args.max_tokens,
"summary_max_tokens": args.summary_max_tokens,
"summary_context_size": args.summary_context_size,
"kv_storage": args.kv_storage,
"doc_status_storage": args.doc_status_storage,
"graph_storage": args.graph_storage,

View file

@ -242,8 +242,8 @@ def display_splash_screen(args: argparse.Namespace) -> None:
ASCIIColors.yellow(f"{args.llm_model}")
ASCIIColors.white(" ├─ Max Async for LLM: ", end="")
ASCIIColors.yellow(f"{args.max_async}")
ASCIIColors.white(" ├─ Max Tokens: ", end="")
ASCIIColors.yellow(f"{args.max_tokens}")
ASCIIColors.white(" ├─ Summary Context Size: ", end="")
ASCIIColors.yellow(f"{args.summary_context_size}")
ASCIIColors.white(" ├─ LLM Cache Enabled: ", end="")
ASCIIColors.yellow(f"{args.enable_llm_cache}")
ASCIIColors.white(" └─ LLM Cache for Extraction Enabled: ", end="")

File diff suppressed because one or more lines are too long

File diff suppressed because one or more lines are too long

View file

@ -1,4 +1,4 @@
import{j as o,Y as td,O as fg,k as dg,u as ad,Z as mg,c as hg,l as gg,g as pg,S as yg,T as vg,n as bg,m as nd,o as Sg,p as Tg,$ as ud,a0 as id,a1 as cd,a2 as xg}from"./ui-vendor-CeCm8EER.js";import{d as Ag,h as Dg,r as E,u as sd,H as Ng,i as Eg,j as kf}from"./react-vendor-DEwriMA6.js";import{N as we,c as Ve,ad as od,u as Bl,M as sl,ae as rd,af as fd,I as us,B as Cn,D as Mg,l as zg,m as Cg,n as Og,o as _g,ag as Rg,ah as jg,ai as Ug,aj as Hg,ak as ql,al as dd,am as ss,an as is,a0 as Lg,a1 as qg,a2 as Bg,a3 as Gg,ao as Yg,ap as Xg,aq as md,ar as wg,as as hd,at as Vg,au as gd,d as Qg,R as Kg,V as Zg,g as En,av as kg,aw as Jg,ax as Fg}from"./feature-graph-C6IuADHZ.js";import{S as Jf,a as Ff,b as Pf,c as $f,e as rt,D as Pg}from"./feature-documents-Di_Wt0BY.js";import{R as $g}from"./feature-retrieval-DVuOAaIQ.js";import{i as cs}from"./utils-vendor-BysuhMZA.js";import"./graph-vendor-B-X5JegA.js";import"./mermaid-vendor-CAxUo7Zk.js";import"./markdown-vendor-DmIvJdn7.js";(function(){const y=document.createElement("link").relList;if(y&&y.supports&&y.supports("modulepreload"))return;for(const N of document.querySelectorAll('link[rel="modulepreload"]'))d(N);new MutationObserver(N=>{for(const _ of N)if(_.type==="childList")for(const H of _.addedNodes)H.tagName==="LINK"&&H.rel==="modulepreload"&&d(H)}).observe(document,{childList:!0,subtree:!0});function x(N){const _={};return N.integrity&&(_.integrity=N.integrity),N.referrerPolicy&&(_.referrerPolicy=N.referrerPolicy),N.crossOrigin==="use-credentials"?_.credentials="include":N.crossOrigin==="anonymous"?_.credentials="omit":_.credentials="same-origin",_}function d(N){if(N.ep)return;N.ep=!0;const _=x(N);fetch(N.href,_)}})();var ts={exports:{}},Mn={},as={exports:{}},ns={};/**
import{j as o,Y as td,O as fg,k as dg,u as ad,Z as mg,c as hg,l as gg,g as pg,S as yg,T as vg,n as bg,m as nd,o as Sg,p as Tg,$ as ud,a0 as id,a1 as cd,a2 as xg}from"./ui-vendor-CeCm8EER.js";import{d as Ag,h as Dg,r as E,u as sd,H as Ng,i as Eg,j as kf}from"./react-vendor-DEwriMA6.js";import{N as we,c as Ve,ad as od,u as Bl,M as sl,ae as rd,af as fd,I as us,B as Cn,D as Mg,l as zg,m as Cg,n as Og,o as _g,ag as Rg,ah as jg,ai as Ug,aj as Hg,ak as ql,al as dd,am as ss,an as is,a0 as Lg,a1 as qg,a2 as Bg,a3 as Gg,ao as Yg,ap as Xg,aq as md,ar as wg,as as hd,at as Vg,au as gd,d as Qg,R as Kg,V as Zg,g as En,av as kg,aw as Jg,ax as Fg}from"./feature-graph-C6IuADHZ.js";import{S as Jf,a as Ff,b as Pf,c as $f,e as rt,D as Pg}from"./feature-documents-DLarjU2a.js";import{R as $g}from"./feature-retrieval-P5Qspbob.js";import{i as cs}from"./utils-vendor-BysuhMZA.js";import"./graph-vendor-B-X5JegA.js";import"./mermaid-vendor-CAxUo7Zk.js";import"./markdown-vendor-DmIvJdn7.js";(function(){const y=document.createElement("link").relList;if(y&&y.supports&&y.supports("modulepreload"))return;for(const N of document.querySelectorAll('link[rel="modulepreload"]'))d(N);new MutationObserver(N=>{for(const _ of N)if(_.type==="childList")for(const H of _.addedNodes)H.tagName==="LINK"&&H.rel==="modulepreload"&&d(H)}).observe(document,{childList:!0,subtree:!0});function x(N){const _={};return N.integrity&&(_.integrity=N.integrity),N.referrerPolicy&&(_.referrerPolicy=N.referrerPolicy),N.crossOrigin==="use-credentials"?_.credentials="include":N.crossOrigin==="anonymous"?_.credentials="omit":_.credentials="same-origin",_}function d(N){if(N.ep)return;N.ep=!0;const _=x(N);fetch(N.href,_)}})();var ts={exports:{}},Mn={},as={exports:{}},ns={};/**
* @license React
* scheduler.production.js
*

View file

@ -8,16 +8,16 @@
<link rel="icon" type="image/png" href="favicon.png" />
<meta name="viewport" content="width=device-width, initial-scale=1.0" />
<title>Lightrag</title>
<script type="module" crossorigin src="/webui/assets/index-B8PWUG__.js"></script>
<script type="module" crossorigin src="/webui/assets/index-DI6XUmEl.js"></script>
<link rel="modulepreload" crossorigin href="/webui/assets/react-vendor-DEwriMA6.js">
<link rel="modulepreload" crossorigin href="/webui/assets/ui-vendor-CeCm8EER.js">
<link rel="modulepreload" crossorigin href="/webui/assets/graph-vendor-B-X5JegA.js">
<link rel="modulepreload" crossorigin href="/webui/assets/utils-vendor-BysuhMZA.js">
<link rel="modulepreload" crossorigin href="/webui/assets/feature-graph-C6IuADHZ.js">
<link rel="modulepreload" crossorigin href="/webui/assets/feature-documents-Di_Wt0BY.js">
<link rel="modulepreload" crossorigin href="/webui/assets/feature-documents-DLarjU2a.js">
<link rel="modulepreload" crossorigin href="/webui/assets/mermaid-vendor-CAxUo7Zk.js">
<link rel="modulepreload" crossorigin href="/webui/assets/markdown-vendor-DmIvJdn7.js">
<link rel="modulepreload" crossorigin href="/webui/assets/feature-retrieval-DVuOAaIQ.js">
<link rel="modulepreload" crossorigin href="/webui/assets/feature-retrieval-P5Qspbob.js">
<link rel="stylesheet" crossorigin href="/webui/assets/feature-graph-BipNuM18.css">
<link rel="stylesheet" crossorigin href="/webui/assets/index-CafJWW1u.css">
</head>

View file

@ -12,9 +12,17 @@ DEFAULT_MAX_GRAPH_NODES = 1000
# Default values for extraction settings
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for summaries
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 4
DEFAULT_MAX_GLEANING = 1
DEFAULT_SUMMARY_MAX_TOKENS = 30000 # Default maximum token size
# Number of description fragments to trigger LLM summary
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8
# Max description token size to trigger LLM summary
DEFAULT_SUMMARY_MAX_TOKENS = 1200
# Recommended LLM summary output length in tokens
DEFAULT_SUMMARY_LENGTH_RECOMMENDED = 600
# Maximum token size sent to LLM for summary
DEFAULT_SUMMARY_CONTEXT_SIZE = 12000
# Default entities to extract if ENTITY_TYPES is not specified in .env
DEFAULT_ENTITY_TYPES = ["organization", "person", "geo", "event", "category"]
# Separator for graph fields

View file

@ -34,6 +34,8 @@ from lightrag.constants import (
DEFAULT_KG_CHUNK_PICK_METHOD,
DEFAULT_MIN_RERANK_SCORE,
DEFAULT_SUMMARY_MAX_TOKENS,
DEFAULT_SUMMARY_CONTEXT_SIZE,
DEFAULT_SUMMARY_LENGTH_RECOMMENDED,
DEFAULT_MAX_ASYNC,
DEFAULT_MAX_PARALLEL_INSERT,
DEFAULT_MAX_GRAPH_NODES,
@ -286,8 +288,20 @@ class LightRAG:
summary_max_tokens: int = field(
default=int(os.getenv("SUMMARY_MAX_TOKENS", DEFAULT_SUMMARY_MAX_TOKENS))
)
"""Maximum tokens allowed for entity/relation description."""
summary_context_size: int = field(
default=int(os.getenv("SUMMARY_CONTEXT_SIZE", DEFAULT_SUMMARY_CONTEXT_SIZE))
)
"""Maximum number of tokens allowed per LLM response."""
summary_length_recommended: int = field(
default=int(
os.getenv("SUMMARY_LENGTH_RECOMMENDED", DEFAULT_SUMMARY_LENGTH_RECOMMENDED)
)
)
"""Recommended length of LLM summary output."""
llm_model_max_async: int = field(
default=int(os.getenv("MAX_ASYNC", DEFAULT_MAX_ASYNC))
)
@ -418,6 +432,20 @@ class LightRAG:
if self.ollama_server_infos is None:
self.ollama_server_infos = OllamaServerInfos()
# Validate config
if self.force_llm_summary_on_merge < 3:
logger.warning(
f"force_llm_summary_on_merge should be at least 3, got {self.force_llm_summary_on_merge}"
)
if self.summary_context_size > self.max_total_tokens:
logger.warning(
f"summary_context_size({self.summary_context_size}) should no greater than max_total_tokens({self.max_total_tokens})"
)
if self.summary_length_recommended > self.summary_max_tokens:
logger.warning(
f"max_total_tokens({self.summary_max_tokens}) should greater than summary_length_recommended({self.summary_length_recommended})"
)
# Fix global_config now
global_config = asdict(self)
@ -2274,117 +2302,111 @@ class LightRAG:
relationships_to_delete = set()
relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids
# Use graph database lock to ensure atomic merges and updates
try:
# Get affected entities and relations from full_entities and full_relations storage
doc_entities_data = await self.full_entities.get_by_id(doc_id)
doc_relations_data = await self.full_relations.get_by_id(doc_id)
affected_nodes = []
affected_edges = []
# Get entity data from graph storage using entity names from full_entities
if doc_entities_data and "entity_names" in doc_entities_data:
entity_names = doc_entities_data["entity_names"]
# get_nodes_batch returns dict[str, dict], need to convert to list[dict]
nodes_dict = await self.chunk_entity_relation_graph.get_nodes_batch(
entity_names
)
for entity_name in entity_names:
node_data = nodes_dict.get(entity_name)
if node_data:
# Ensure compatibility with existing logic that expects "id" field
if "id" not in node_data:
node_data["id"] = entity_name
affected_nodes.append(node_data)
# Get relation data from graph storage using relation pairs from full_relations
if doc_relations_data and "relation_pairs" in doc_relations_data:
relation_pairs = doc_relations_data["relation_pairs"]
edge_pairs_dicts = [
{"src": pair[0], "tgt": pair[1]} for pair in relation_pairs
]
# get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict]
edges_dict = await self.chunk_entity_relation_graph.get_edges_batch(
edge_pairs_dicts
)
for pair in relation_pairs:
src, tgt = pair[0], pair[1]
edge_key = (src, tgt)
edge_data = edges_dict.get(edge_key)
if edge_data:
# Ensure compatibility with existing logic that expects "source" and "target" fields
if "source" not in edge_data:
edge_data["source"] = src
if "target" not in edge_data:
edge_data["target"] = tgt
affected_edges.append(edge_data)
except Exception as e:
logger.error(f"Failed to analyze affected graph elements: {e}")
raise Exception(f"Failed to analyze graph dependencies: {e}") from e
try:
# Process entities
for node_data in affected_nodes:
node_label = node_data.get("entity_id")
if node_label and "source_id" in node_data:
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
if not remaining_sources:
entities_to_delete.add(node_label)
elif remaining_sources != sources:
entities_to_rebuild[node_label] = remaining_sources
async with pipeline_status_lock:
log_message = f"Found {len(entities_to_rebuild)} affected entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Process relationships
for edge_data in affected_edges:
src = edge_data.get("source")
tgt = edge_data.get("target")
if src and tgt and "source_id" in edge_data:
edge_tuple = tuple(sorted((src, tgt)))
if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
):
continue
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
if not remaining_sources:
relationships_to_delete.add(edge_tuple)
elif remaining_sources != sources:
relationships_to_rebuild[edge_tuple] = remaining_sources
async with pipeline_status_lock:
log_message = (
f"Found {len(relationships_to_rebuild)} affected relations"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e
# Use graph database lock to prevent dirty read
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
try:
# Get affected entities and relations from full_entities and full_relations storage
doc_entities_data = await self.full_entities.get_by_id(doc_id)
doc_relations_data = await self.full_relations.get_by_id(doc_id)
affected_nodes = []
affected_edges = []
# Get entity data from graph storage using entity names from full_entities
if doc_entities_data and "entity_names" in doc_entities_data:
entity_names = doc_entities_data["entity_names"]
# get_nodes_batch returns dict[str, dict], need to convert to list[dict]
nodes_dict = (
await self.chunk_entity_relation_graph.get_nodes_batch(
entity_names
)
)
for entity_name in entity_names:
node_data = nodes_dict.get(entity_name)
if node_data:
# Ensure compatibility with existing logic that expects "id" field
if "id" not in node_data:
node_data["id"] = entity_name
affected_nodes.append(node_data)
# Get relation data from graph storage using relation pairs from full_relations
if doc_relations_data and "relation_pairs" in doc_relations_data:
relation_pairs = doc_relations_data["relation_pairs"]
edge_pairs_dicts = [
{"src": pair[0], "tgt": pair[1]} for pair in relation_pairs
]
# get_edges_batch returns dict[tuple[str, str], dict], need to convert to list[dict]
edges_dict = (
await self.chunk_entity_relation_graph.get_edges_batch(
edge_pairs_dicts
)
)
for pair in relation_pairs:
src, tgt = pair[0], pair[1]
edge_key = (src, tgt)
edge_data = edges_dict.get(edge_key)
if edge_data:
# Ensure compatibility with existing logic that expects "source" and "target" fields
if "source" not in edge_data:
edge_data["source"] = src
if "target" not in edge_data:
edge_data["target"] = tgt
affected_edges.append(edge_data)
except Exception as e:
logger.error(f"Failed to analyze affected graph elements: {e}")
raise Exception(f"Failed to analyze graph dependencies: {e}") from e
try:
# Process entities
for node_data in affected_nodes:
node_label = node_data.get("entity_id")
if node_label and "source_id" in node_data:
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
if not remaining_sources:
entities_to_delete.add(node_label)
elif remaining_sources != sources:
entities_to_rebuild[node_label] = remaining_sources
async with pipeline_status_lock:
log_message = (
f"Found {len(entities_to_rebuild)} affected entities"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Process relationships
for edge_data in affected_edges:
src = edge_data.get("source")
tgt = edge_data.get("target")
if src and tgt and "source_id" in edge_data:
edge_tuple = tuple(sorted((src, tgt)))
if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
):
continue
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP))
remaining_sources = sources - chunk_ids
if not remaining_sources:
relationships_to_delete.add(edge_tuple)
elif remaining_sources != sources:
relationships_to_rebuild[edge_tuple] = remaining_sources
async with pipeline_status_lock:
log_message = (
f"Found {len(relationships_to_rebuild)} affected relations"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e
# 5. Delete chunks from storage
if chunk_ids:
try:
@ -2455,27 +2477,28 @@ class LightRAG:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
try:
await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entities_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
text_chunks_storage=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
)
# Persist changes to graph database before releasing graph database lock
await self._insert_done()
except Exception as e:
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
raise Exception(
f"Failed to rebuild knowledge graph: {e}"
) from e
# 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
try:
await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
entities_vdb=self.entities_vdb,
relationships_vdb=self.relationships_vdb,
text_chunks_storage=self.text_chunks,
llm_response_cache=self.llm_response_cache,
global_config=asdict(self),
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
)
except Exception as e:
logger.error(f"Failed to rebuild knowledge from chunks: {e}")
raise Exception(f"Failed to rebuild knowledge graph: {e}") from e
# 9. Delete from full_entities and full_relations storage
try:

View file

@ -115,48 +115,197 @@ def chunking_by_token_size(
async def _handle_entity_relation_summary(
description_type: str,
entity_or_relation_name: str,
description: str,
description_list: list[str],
seperator: str,
global_config: dict,
llm_response_cache: BaseKVStorage | None = None,
) -> tuple[str, bool]:
"""Handle entity relation description summary using map-reduce approach.
This function summarizes a list of descriptions using a map-reduce strategy:
1. If total tokens < summary_context_size and len(description_list) < force_llm_summary_on_merge, no need to summarize
2. If total tokens < summary_max_tokens, summarize with LLM directly
3. Otherwise, split descriptions into chunks that fit within token limits
4. Summarize each chunk, then recursively process the summaries
5. Continue until we get a final summary within token limits or num of descriptions is less than force_llm_summary_on_merge
Args:
entity_or_relation_name: Name of the entity or relation being summarized
description_list: List of description strings to summarize
global_config: Global configuration containing tokenizer and limits
llm_response_cache: Optional cache for LLM responses
Returns:
Tuple of (final_summarized_description_string, llm_was_used_boolean)
"""
# Handle empty input
if not description_list:
return "", False
# If only one description, return it directly (no need for LLM call)
if len(description_list) == 1:
return description_list[0], False
# Get configuration
tokenizer: Tokenizer = global_config["tokenizer"]
summary_context_size = global_config["summary_context_size"]
summary_max_tokens = global_config["summary_max_tokens"]
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
current_list = description_list[:] # Copy the list to avoid modifying original
llm_was_used = False # Track whether LLM was used during the entire process
# Iterative map-reduce process
while True:
# Calculate total tokens in current list
total_tokens = sum(len(tokenizer.encode(desc)) for desc in current_list)
# If total length is within limits, perform final summarization
if total_tokens <= summary_context_size or len(current_list) <= 2:
if (
len(current_list) < force_llm_summary_on_merge
and total_tokens < summary_max_tokens
):
# no LLM needed, just join the descriptions
final_description = seperator.join(current_list)
return final_description if final_description else "", llm_was_used
else:
if total_tokens > summary_context_size and len(current_list) <= 2:
logger.warning(
f"Summarizing {entity_or_relation_name}: Oversize descpriton found"
)
# Final summarization of remaining descriptions - LLM will be used
final_summary = await _summarize_descriptions(
description_type,
entity_or_relation_name,
current_list,
global_config,
llm_response_cache,
)
return final_summary, True # LLM was used for final summarization
# Need to split into chunks - Map phase
# Ensure each chunk has minimum 2 descriptions to guarantee progress
chunks = []
current_chunk = []
current_tokens = 0
# Currently least 3 descriptions in current_list
for i, desc in enumerate(current_list):
desc_tokens = len(tokenizer.encode(desc))
# If adding current description would exceed limit, finalize current chunk
if current_tokens + desc_tokens > summary_context_size and current_chunk:
# Ensure we have at least 2 descriptions in the chunk (when possible)
if len(current_chunk) == 1:
# Force add one more description to ensure minimum 2 per chunk
current_chunk.append(desc)
chunks.append(current_chunk)
logger.warning(
f"Summarizing {entity_or_relation_name}: Oversize descpriton found"
)
current_chunk = [] # next group is empty
current_tokens = 0
else: # curren_chunk is ready for summary in reduce phase
chunks.append(current_chunk)
current_chunk = [desc] # leave it for next group
current_tokens = desc_tokens
else:
current_chunk.append(desc)
current_tokens += desc_tokens
# Add the last chunk if it exists
if current_chunk:
chunks.append(current_chunk)
logger.info(
f" Summarizing {entity_or_relation_name}: Map {len(current_list)} descriptions into {len(chunks)} groups"
)
# Reduce phase: summarize each group from chunks
new_summaries = []
for chunk in chunks:
if len(chunk) == 1:
# Optimization: single description chunks don't need LLM summarization
new_summaries.append(chunk[0])
else:
# Multiple descriptions need LLM summarization
summary = await _summarize_descriptions(
description_type,
entity_or_relation_name,
chunk,
global_config,
llm_response_cache,
)
new_summaries.append(summary)
llm_was_used = True # Mark that LLM was used in reduce phase
# Update current list with new summaries for next iteration
current_list = new_summaries
async def _summarize_descriptions(
description_type: str,
description_name: str,
description_list: list[str],
global_config: dict,
llm_response_cache: BaseKVStorage | None = None,
) -> str:
"""Handle entity relation summary
For each entity or relation, input is the combined description of already existing description and new description.
If too long, use LLM to summarize.
"""Helper function to summarize a list of descriptions using LLM.
Args:
entity_or_relation_name: Name of the entity or relation being summarized
descriptions: List of description strings to summarize
global_config: Global configuration containing LLM function and settings
llm_response_cache: Optional cache for LLM responses
Returns:
Summarized description string
"""
use_llm_func: callable = global_config["llm_model_func"]
# Apply higher priority (8) to entity/relation summary tasks
use_llm_func = partial(use_llm_func, _priority=8)
tokenizer: Tokenizer = global_config["tokenizer"]
llm_max_tokens = global_config["summary_max_tokens"]
language = global_config["addon_params"].get(
"language", PROMPTS["DEFAULT_LANGUAGE"]
)
tokens = tokenizer.encode(description)
### summarize is not determined here anymore (It's determined by num_fragment now)
# if len(tokens) < summary_max_tokens: # No need for summary
# return description
summary_length_recommended = global_config["summary_length_recommended"]
prompt_template = PROMPTS["summarize_entity_descriptions"]
use_description = tokenizer.decode(tokens[:llm_max_tokens])
# Join descriptions and apply token-based truncation if necessary
joined_descriptions = "\n\n".join(description_list)
tokenizer = global_config["tokenizer"]
summary_context_size = global_config["summary_context_size"]
# Token-based truncation to ensure input fits within limits
tokens = tokenizer.encode(joined_descriptions)
if len(tokens) > summary_context_size:
truncated_tokens = tokens[:summary_context_size]
joined_descriptions = tokenizer.decode(truncated_tokens)
# Prepare context for the prompt
context_base = dict(
entity_name=entity_or_relation_name,
description_list=use_description.split(GRAPH_FIELD_SEP),
description_type=description_type,
description_name=description_name,
description_list=joined_descriptions,
summary_length=summary_length_recommended,
language=language,
)
use_prompt = prompt_template.format(**context_base)
logger.debug(f"Trigger summary: {entity_or_relation_name}")
logger.debug(
f"Summarizing {len(description_list)} descriptions for: {description_name}"
)
# Use LLM function with cache (higher priority for summary generation)
summary = await use_llm_func_with_cache(
use_prompt,
use_llm_func,
llm_response_cache=llm_response_cache,
# max_tokens=summary_max_tokens,
cache_type="extract",
)
return summary
@ -414,7 +563,7 @@ async def _rebuild_knowledge_from_chunks(
)
rebuilt_entities_count += 1
status_message = (
f"Rebuilt entity: {entity_name} from {len(chunk_ids)} chunks"
f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
@ -423,7 +572,7 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_entities_count += 1
status_message = f"Failed to rebuild entity {entity_name}: {e}"
status_message = f"Failed to rebuild `{entity_name}`: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
@ -454,7 +603,9 @@ async def _rebuild_knowledge_from_chunks(
global_config=global_config,
)
rebuilt_relationships_count += 1
status_message = f"Rebuilt relationship: {src}->{tgt} from {len(chunk_ids)} chunks"
status_message = (
f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
@ -462,7 +613,7 @@ async def _rebuild_knowledge_from_chunks(
pipeline_status["history_messages"].append(status_message)
except Exception as e:
failed_relationships_count += 1
status_message = f"Failed to rebuild relationship {src}->{tgt}: {e}"
status_message = f"Failed to rebuild `{src} - {tgt}`: {e}"
logger.info(status_message) # Per requirement, change to info
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
@ -526,14 +677,20 @@ async def _get_cached_extraction_results(
) -> dict[str, list[str]]:
"""Get cached extraction results for specific chunk IDs
This function retrieves cached LLM extraction results for the given chunk IDs and returns
them sorted by creation time. The results are sorted at two levels:
1. Individual extraction results within each chunk are sorted by create_time (earliest first)
2. Chunks themselves are sorted by the create_time of their earliest extraction result
Args:
llm_response_cache: LLM response cache storage
chunk_ids: Set of chunk IDs to get cached results for
text_chunks_data: Pre-loaded chunk data (optional, for performance)
text_chunks_storage: Text chunks storage (fallback if text_chunks_data is None)
text_chunks_storage: Text chunks storage for retrieving chunk data and LLM cache references
Returns:
Dict mapping chunk_id -> list of extraction_result_text
Dict mapping chunk_id -> list of extraction_result_text, where:
- Keys (chunk_ids) are ordered by the create_time of their first extraction result
- Values (extraction results) are ordered by create_time within each chunk
"""
cached_results = {}
@ -542,15 +699,13 @@ async def _get_cached_extraction_results(
# Read from storage
chunk_data_list = await text_chunks_storage.get_by_ids(list(chunk_ids))
for chunk_id, chunk_data in zip(chunk_ids, chunk_data_list):
for chunk_data in chunk_data_list:
if chunk_data and isinstance(chunk_data, dict):
llm_cache_list = chunk_data.get("llm_cache_list", [])
if llm_cache_list:
all_cache_ids.update(llm_cache_list)
else:
logger.warning(
f"Chunk {chunk_id} data is invalid or None: {type(chunk_data)}"
)
logger.warning(f"Chunk data is invalid or None: {chunk_data}")
if not all_cache_ids:
logger.warning(f"No LLM cache IDs found for {len(chunk_ids)} chunk IDs")
@ -561,7 +716,7 @@ async def _get_cached_extraction_results(
# Process cache entries and group by chunk_id
valid_entries = 0
for cache_id, cache_entry in zip(all_cache_ids, cache_data_list):
for cache_entry in cache_data_list:
if (
cache_entry is not None
and isinstance(cache_entry, dict)
@ -581,16 +736,30 @@ async def _get_cached_extraction_results(
# Store tuple with extraction result and creation time for sorting
cached_results[chunk_id].append((extraction_result, create_time))
# Sort extraction results by create_time for each chunk
# Sort extraction results by create_time for each chunk and collect earliest times
chunk_earliest_times = {}
for chunk_id in cached_results:
# Sort by create_time (x[1]), then extract only extraction_result (x[0])
cached_results[chunk_id].sort(key=lambda x: x[1])
# Store the earliest create_time for this chunk (first item after sorting)
chunk_earliest_times[chunk_id] = cached_results[chunk_id][0][1]
# Extract only extraction_result (x[0])
cached_results[chunk_id] = [item[0] for item in cached_results[chunk_id]]
logger.info(
f"Found {valid_entries} valid cache entries, {len(cached_results)} chunks with results"
# Sort cached_results by the earliest create_time of each chunk
sorted_chunk_ids = sorted(
chunk_earliest_times.keys(), key=lambda chunk_id: chunk_earliest_times[chunk_id]
)
return cached_results
# Rebuild cached_results in sorted order
sorted_cached_results = {}
for chunk_id in sorted_chunk_ids:
sorted_cached_results[chunk_id] = cached_results[chunk_id]
logger.info(
f"Found {valid_entries} valid cache entries, {len(sorted_cached_results)} chunks with results"
)
return sorted_cached_results
async def _parse_extraction_result(
@ -691,15 +860,6 @@ async def _rebuild_single_entity(
# Update entity in vector database
entity_vdb_id = compute_mdhash_id(entity_name, prefix="ent-")
# Delete old vector record first
try:
await entities_vdb.delete([entity_vdb_id])
except Exception as e:
logger.debug(
f"Could not delete old entity vector record {entity_vdb_id}: {e}"
)
# Insert new vector record
entity_content = f"{entity_name}\n{final_description}"
await entities_vdb.upsert(
{
@ -714,21 +874,6 @@ async def _rebuild_single_entity(
}
)
# Helper function to generate final description with optional LLM summary
async def _generate_final_description(combined_description: str) -> str:
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
return await _handle_entity_relation_summary(
entity_name,
combined_description,
global_config,
llm_response_cache=llm_response_cache,
)
else:
return combined_description
# Collect all entity data from relevant chunks
all_entity_data = []
for chunk_id in chunk_ids:
@ -737,13 +882,13 @@ async def _rebuild_single_entity(
if not all_entity_data:
logger.warning(
f"No cached entity data found for {entity_name}, trying to rebuild from relationships"
f"No entity data found for `{entity_name}`, trying to rebuild from relationships"
)
# Get all edges connected to this entity
edges = await knowledge_graph_inst.get_node_edges(entity_name)
if not edges:
logger.warning(f"No relationships found for entity {entity_name}")
logger.warning(f"No relations attached to entity `{entity_name}`")
return
# Collect relationship data to extract entity information
@ -761,10 +906,19 @@ async def _rebuild_single_entity(
edge_file_paths = edge_data["file_path"].split(GRAPH_FIELD_SEP)
file_paths.update(edge_file_paths)
# Generate description from relationships or fallback to current
if relationship_descriptions:
combined_description = GRAPH_FIELD_SEP.join(relationship_descriptions)
final_description = await _generate_final_description(combined_description)
# deduplicate descriptions
description_list = list(dict.fromkeys(relationship_descriptions))
# Generate final description from relationships or fallback to current
if description_list:
final_description, _ = await _handle_entity_relation_summary(
"Entity",
entity_name,
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = current_entity.get("description", "")
@ -785,12 +939,9 @@ async def _rebuild_single_entity(
if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"])
# Combine all descriptions
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_entity.get("description", "")
)
# Remove duplicates while preserving order
description_list = list(dict.fromkeys(descriptions))
entity_types = list(dict.fromkeys(entity_types))
# Get most common entity type
entity_type = (
@ -799,8 +950,19 @@ async def _rebuild_single_entity(
else current_entity.get("entity_type", "UNKNOWN")
)
# Generate final description and update storage
final_description = await _generate_final_description(combined_description)
# Generate final description from entities or fallback to current
if description_list:
final_description, _ = await _handle_entity_relation_summary(
"Entity",
entity_name,
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = current_entity.get("description", "")
await _update_entity_storage(final_description, entity_type, file_paths)
@ -837,7 +999,7 @@ async def _rebuild_single_relationship(
)
if not all_relationship_data:
logger.warning(f"No cached relationship data found for {src}-{tgt}")
logger.warning(f"No relation data found for `{src}-{tgt}`")
return
# Merge descriptions and keywords
@ -856,42 +1018,38 @@ async def _rebuild_single_relationship(
if rel_data.get("file_path"):
file_paths.add(rel_data["file_path"])
# Combine descriptions and keywords
combined_description = (
GRAPH_FIELD_SEP.join(descriptions)
if descriptions
else current_relationship.get("description", "")
)
# Remove duplicates while preserving order
description_list = list(dict.fromkeys(descriptions))
keywords = list(dict.fromkeys(keywords))
combined_keywords = (
", ".join(set(keywords))
if keywords
else current_relationship.get("keywords", "")
)
# weight = (
# sum(weights) / len(weights)
# if weights
# else current_relationship.get("weight", 1.0)
# )
weight = sum(weights) if weights else current_relationship.get("weight", 1.0)
# Use summary if description has too many fragments
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = combined_description.count(GRAPH_FIELD_SEP) + 1
if num_fragment >= force_llm_summary_on_merge:
final_description = await _handle_entity_relation_summary(
# Generate final description from relations or fallback to current
if description_list:
final_description, _ = await _handle_entity_relation_summary(
"Relation",
f"{src}-{tgt}",
combined_description,
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache=llm_response_cache,
)
else:
final_description = combined_description
# fallback to keep current(unchanged)
final_description = current_relationship.get("description", "")
# Update relationship in graph storage
updated_relationship_data = {
**current_relationship,
"description": final_description,
"description": final_description
if final_description
else current_relationship.get("description", ""),
"keywords": combined_keywords,
"weight": weight,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids),
@ -949,13 +1107,9 @@ async def _merge_nodes_then_upsert(
already_node = await knowledge_graph_inst.get_node(entity_name)
if already_node:
already_entity_types.append(already_node["entity_type"])
already_source_ids.extend(
split_string_by_multi_markers(already_node["source_id"], [GRAPH_FIELD_SEP])
)
already_file_paths.extend(
split_string_by_multi_markers(already_node["file_path"], [GRAPH_FIELD_SEP])
)
already_description.append(already_node["description"])
already_source_ids.extend(already_node["source_id"].split(GRAPH_FIELD_SEP))
already_file_paths.extend(already_node["file_path"].split(GRAPH_FIELD_SEP))
already_description.extend(already_node["description"].split(GRAPH_FIELD_SEP))
entity_type = sorted(
Counter(
@ -963,42 +1117,54 @@ async def _merge_nodes_then_upsert(
).items(),
key=lambda x: x[1],
reverse=True,
)[0][0]
description = GRAPH_FIELD_SEP.join(
sorted(set([dp["description"] for dp in nodes_data] + already_description))
)[0][0] # Get the entity type with the highest count
# merge and deduplicate description
description_list = list(
dict.fromkeys(
already_description
+ [dp["description"] for dp in nodes_data if dp.get("description")]
)
)
num_fragment = len(description_list)
already_fragment = len(already_description)
deduplicated_num = already_fragment + len(nodes_data) - num_fragment
if deduplicated_num > 0:
dd_message = f"(dd:{deduplicated_num})"
else:
dd_message = ""
if num_fragment > 0:
# Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Entity",
entity_name,
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache,
)
# Log based on actual LLM usage
if llm_was_used:
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
else:
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
else:
logger.error(f"Entity {entity_name} has no description")
description = "(no description)"
source_id = GRAPH_FIELD_SEP.join(
set([dp["source_id"] for dp in nodes_data] + already_source_ids)
)
file_path = build_file_path(already_file_paths, nodes_data, entity_name)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
num_new_fragment = len(set([dp["description"] for dp in nodes_data]))
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
description = await _handle_entity_relation_summary(
entity_name,
description,
global_config,
llm_response_cache,
)
else:
status_message = f"Merge N: {entity_name} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
node_data = dict(
entity_id=entity_name,
entity_type=entity_type,
@ -1045,22 +1211,20 @@ async def _merge_edges_then_upsert(
# Get source_id with empty string default if missing or None
if already_edge.get("source_id") is not None:
already_source_ids.extend(
split_string_by_multi_markers(
already_edge["source_id"], [GRAPH_FIELD_SEP]
)
already_edge["source_id"].split(GRAPH_FIELD_SEP)
)
# Get file_path with empty string default if missing or None
if already_edge.get("file_path") is not None:
already_file_paths.extend(
split_string_by_multi_markers(
already_edge["file_path"], [GRAPH_FIELD_SEP]
)
already_edge["file_path"].split(GRAPH_FIELD_SEP)
)
# Get description with empty string default if missing or None
if already_edge.get("description") is not None:
already_description.append(already_edge["description"])
already_description.extend(
already_edge["description"].split(GRAPH_FIELD_SEP)
)
# Get keywords with empty string default if missing or None
if already_edge.get("keywords") is not None:
@ -1072,15 +1236,47 @@ async def _merge_edges_then_upsert(
# Process edges_data with None checks
weight = sum([dp["weight"] for dp in edges_data] + already_weights)
description = GRAPH_FIELD_SEP.join(
sorted(
set(
[dp["description"] for dp in edges_data if dp.get("description")]
+ already_description
)
description_list = list(
dict.fromkeys(
already_description
+ [dp["description"] for dp in edges_data if dp.get("description")]
)
)
num_fragment = len(description_list)
already_fragment = len(already_description)
deduplicated_num = already_fragment + len(edges_data) - num_fragment
if deduplicated_num > 0:
dd_message = f"(dd:{deduplicated_num})"
else:
dd_message = ""
if num_fragment > 0:
# Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary(
"Relation",
f"({src_id}, {tgt_id})",
description_list,
GRAPH_FIELD_SEP,
global_config,
llm_response_cache,
)
# Log based on actual LLM usage
if llm_was_used:
status_message = f"LLMmrg: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
else:
status_message = f"Merged: `{src_id} - {tgt_id}` | {already_fragment}+{num_fragment-already_fragment}{dd_message}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
else:
logger.error(f"Edge {src_id} - {tgt_id} has no description")
description = "(no description)"
# Split all existing and new keywords into individual terms, then combine and deduplicate
all_keywords = set()
# Process already_keywords (which are comma-separated)
@ -1128,35 +1324,6 @@ async def _merge_edges_then_upsert(
}
added_entities.append(entity_data)
force_llm_summary_on_merge = global_config["force_llm_summary_on_merge"]
num_fragment = description.count(GRAPH_FIELD_SEP) + 1
num_new_fragment = len(
set([dp["description"] for dp in edges_data if dp.get("description")])
)
if num_fragment > 1:
if num_fragment >= force_llm_summary_on_merge:
status_message = f"LLM merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
description = await _handle_entity_relation_summary(
f"({src_id}, {tgt_id})",
description,
global_config,
llm_response_cache,
)
else:
status_message = f"Merge E: {src_id} - {tgt_id} | {num_new_fragment}+{num_fragment-num_new_fragment}"
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
await knowledge_graph_inst.upsert_edge(
src_id,
tgt_id,
@ -1464,7 +1631,7 @@ async def merge_nodes_and_edges(
)
# Don't raise exception to avoid affecting main flow
log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} added entities, {len(processed_edges)} relations"
log_message = f"Completed merging: {len(processed_entities)} entities, {len(all_added_entities)} extra entities, {len(processed_edges)} relations"
logger.info(log_message)
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
@ -2058,6 +2225,7 @@ async def _get_vector_context(
results = await chunks_vdb.query(query, top_k=search_top_k, ids=query_param.ids)
if not results:
logger.info(f"Naive query: 0 chunks (chunk_top_k: {search_top_k})")
return []
valid_chunks = []
@ -2180,6 +2348,8 @@ async def _build_query_context(
"frequency": 1, # Vector chunks always have frequency 1
"order": i + 1, # 1-based order in vector search results
}
else:
logger.warning(f"Vector chunk missing chunk_id: {chunk}")
# Use round-robin merge to combine local and global data fairly
final_entities = []
@ -2465,6 +2635,7 @@ async def _build_query_context(
# Apply token processing to merged chunks
text_units_context = []
truncated_chunks = []
if merged_chunks:
# Calculate dynamic token limit for text chunks
entities_str = json.dumps(entities_context, ensure_ascii=False)
@ -2496,15 +2667,15 @@ async def _build_query_context(
kg_context_tokens = len(tokenizer.encode(kg_context))
# Calculate actual system prompt overhead dynamically
# 1. Calculate conversation history tokens
# 1. Converstion history not included in context length calculation
history_context = ""
if query_param.conversation_history:
history_context = get_conversation_turns(
query_param.conversation_history, query_param.history_turns
)
history_tokens = (
len(tokenizer.encode(history_context)) if history_context else 0
)
# if query_param.conversation_history:
# history_context = get_conversation_turns(
# query_param.conversation_history, query_param.history_turns
# )
# history_tokens = (
# len(tokenizer.encode(history_context)) if history_context else 0
# )
# 2. Calculate system prompt template tokens (excluding context_data)
user_prompt = query_param.user_prompt if query_param.user_prompt else ""
@ -2539,7 +2710,7 @@ async def _build_query_context(
available_chunk_tokens = max_total_tokens - used_tokens
logger.debug(
f"Token allocation - Total: {max_total_tokens}, History: {history_tokens}, SysPrompt: {sys_prompt_overhead}, KG: {kg_context_tokens}, Buffer: {buffer_tokens}, Available for chunks: {available_chunk_tokens}"
f"Token allocation - Total: {max_total_tokens}, SysPrompt: {sys_prompt_overhead}, KG: {kg_context_tokens}, Buffer: {buffer_tokens}, Available for chunks: {available_chunk_tokens}"
)
# Apply token truncation to chunks using the dynamic limit

View file

@ -38,22 +38,19 @@ Format the content-level key words as ("content_keywords"{tuple_delimiter}<high_
5. When finished, output {completion_delimiter}
######################
---Examples---
######################
{examples}
#############################
---Real Data---
######################
Entity_types: [{entity_types}]
Text:
{input_text}
######################
---Output---
Output:"""
PROMPTS["entity_extraction_examples"] = [
"""Example 1:
"""------Example 1------
Entity_types: [person, technology, mission, organization, location]
Text:
@ -79,8 +76,9 @@ Output:
("relationship"{tuple_delimiter}"Jordan"{tuple_delimiter}"Cruz"{tuple_delimiter}"Jordan's commitment to discovery is in rebellion against Cruz's vision of control and order."{tuple_delimiter}"ideological conflict, rebellion"{tuple_delimiter}5){record_delimiter}
("relationship"{tuple_delimiter}"Taylor"{tuple_delimiter}"The Device"{tuple_delimiter}"Taylor shows reverence towards the device, indicating its importance and potential impact."{tuple_delimiter}"reverence, technological significance"{tuple_delimiter}9){record_delimiter}
("content_keywords"{tuple_delimiter}"power dynamics, ideological conflict, discovery, rebellion"){completion_delimiter}
#############################""",
"""Example 2:
""",
"""------Example 2------
Entity_types: [company, index, commodity, market_trend, economic_policy, biological]
Text:
@ -107,8 +105,9 @@ Output:
("relationship"{tuple_delimiter}"Gold Futures"{tuple_delimiter}"Market Selloff"{tuple_delimiter}"Gold prices rose as investors sought safe-haven assets during the market selloff."{tuple_delimiter}"market reaction, safe-haven investment"{tuple_delimiter}10){record_delimiter}
("relationship"{tuple_delimiter}"Federal Reserve Policy Announcement"{tuple_delimiter}"Market Selloff"{tuple_delimiter}"Speculation over Federal Reserve policy changes contributed to market volatility and investor selloff."{tuple_delimiter}"interest rate impact, financial regulation"{tuple_delimiter}7){record_delimiter}
("content_keywords"{tuple_delimiter}"market downturn, investor sentiment, commodities, Federal Reserve, stock performance"){completion_delimiter}
#############################""",
"""Example 3:
""",
"""------Example 3------
Entity_types: [economic_policy, athlete, event, location, record, organization, equipment]
Text:
@ -128,23 +127,29 @@ Output:
("relationship"{tuple_delimiter}"Noah Carter"{tuple_delimiter}"Carbon-Fiber Spikes"{tuple_delimiter}"Noah Carter used carbon-fiber spikes to enhance performance during the race."{tuple_delimiter}"athletic equipment, performance boost"{tuple_delimiter}7){record_delimiter}
("relationship"{tuple_delimiter}"World Athletics Federation"{tuple_delimiter}"100m Sprint Record"{tuple_delimiter}"The World Athletics Federation is responsible for validating and recognizing new sprint records."{tuple_delimiter}"sports regulation, record certification"{tuple_delimiter}9){record_delimiter}
("content_keywords"{tuple_delimiter}"athletics, sprinting, record-breaking, sports technology, competition"){completion_delimiter}
#############################""",
""",
]
PROMPTS[
"summarize_entity_descriptions"
] = """You are a helpful assistant responsible for generating a comprehensive summary of the data provided below.
Given one or two entities, and a list of descriptions, all related to the same entity or group of entities.
Please concatenate all of these into a single, comprehensive description. Make sure to include information collected from all the descriptions.
If the provided descriptions are contradictory, please resolve the contradictions and provide a single, coherent summary.
Make sure it is written in third person, and include the entity names so we the have full context.
Use {language} as output language.
PROMPTS["summarize_entity_descriptions"] = """---Role---
You are a Knowledge Graph Specialist responsible for data curation and synthesis.
---Task---
Your task is to synthesize a list of descriptions of a given entity or relation into a single, comprehensive, and cohesive summary.
---Instructions---
1. **Comprehensiveness:** The summary must integrate key information from all provided descriptions. Do not omit important facts.
2. **Context:** The summary must explicitly mention the name of the entity or relation for full context.
3. **Style:** The output must be written from an objective, third-person perspective.
4. **Length:** Maintain depth and completeness while ensuring the summary's length not exceed {summary_length} tokens.
5. **Language:** The entire output must be written in {language}.
#######
---Data---
Entities: {entity_name}
Description List: {description_list}
#######
{description_type} Name: {description_name}
Description List:
{description_list}
---Output---
Output:
"""
@ -186,8 +191,7 @@ PROMPTS["entity_if_loop_extraction"] = """
It appears some entities may have still been missed.
---Output---
Answer ONLY by `YES` OR `NO` if there are still entities that need to be added.
Output:
""".strip()
PROMPTS["fail_response"] = (
@ -209,7 +213,7 @@ Generate a concise response based on Knowledge Base and follow Response Rules, c
---Knowledge Graph and Document Chunks---
{context_data}
---RESPONSE GUIDELINES---
---Response Guidelines---
**1. Content & Adherence:**
- Strictly adhere to the provided context from the Knowledge Base. Do not invent, assume, or include any information not present in the source data.
- If the answer cannot be found in the provided context, state that you do not have enough information to answer.
@ -231,8 +235,8 @@ Generate a concise response based on Knowledge Base and follow Response Rules, c
---USER CONTEXT---
- Additional user prompt: {user_prompt}
Response:"""
---Response---
Output:"""
PROMPTS["keywords_extraction"] = """---Role---
You are an expert keyword extractor, specializing in analyzing user queries for a Retrieval-Augmented Generation (RAG) system. Your purpose is to identify both high-level and low-level keywords in the user's query that will be used for effective document retrieval.
@ -255,7 +259,7 @@ Given a user query, your task is to extract two distinct types of keywords:
User Query: {query}
---Output---
"""
Output:"""
PROMPTS["keywords_extraction_examples"] = [
"""Example 1:
@ -325,5 +329,5 @@ Generate a concise response based on Document Chunks and follow Response Rules,
---USER CONTEXT---
- Additional user prompt: {user_prompt}
Response:"""
---Response---
Output:"""

View file

@ -35,7 +35,6 @@ export type LightragStatus = {
embedding_binding: string
embedding_binding_host: string
embedding_model: string
max_tokens: number
kv_storage: string
doc_status_storage: string
graph_storage: string

View file

@ -183,7 +183,7 @@ export default function DocumentManager() {
const setDocumentsPageSize = useSettingsStore.use.setDocumentsPageSize()
// New pagination state
const [, setCurrentPageDocs] = useState<DocStatusResponse[]>([])
const [currentPageDocs, setCurrentPageDocs] = useState<DocStatusResponse[]>([])
const [pagination, setPagination] = useState<PaginationInfo>({
page: 1,
page_size: documentsPageSize,
@ -292,6 +292,16 @@ export default function DocumentManager() {
type DocStatusWithStatus = DocStatusResponse & { status: DocStatus };
const filteredAndSortedDocs = useMemo(() => {
// Use currentPageDocs directly if available (from paginated API)
// This preserves the backend's sort order and prevents status grouping
if (currentPageDocs && currentPageDocs.length > 0) {
return currentPageDocs.map(doc => ({
...doc,
status: doc.status as DocStatus
})) as DocStatusWithStatus[];
}
// Fallback to legacy docs structure for backward compatibility
if (!docs) return null;
// Create a flat array of documents with status information
@ -324,7 +334,7 @@ export default function DocumentManager() {
}
return allDocuments;
}, [docs, sortField, sortDirection, statusFilter, sortDocuments]);
}, [currentPageDocs, docs, sortField, sortDirection, statusFilter, sortDocuments]);
// Calculate current page selection state (after filteredAndSortedDocs is defined)
const currentPageDocIds = useMemo(() => {

View file

@ -25,7 +25,6 @@ dependencies = [
"configparser",
"dotenv",
"future",
"json-repair",
"nano-vectordb",
"networkx",
"numpy",
@ -47,6 +46,8 @@ api = [
"configparser",
"dotenv",
"future",
"nano-vectordb",
"networkx",
"numpy",
"openai",
"pandas>=2.0.0",