This commit is contained in:
Raphaël MANSUY 2025-12-04 19:18:36 +08:00
parent 5b207db37e
commit b38177de80
5 changed files with 1009 additions and 167 deletions

View file

@ -23,7 +23,7 @@ WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System"
# WORKING_DIR=<absolute_path_for_working_dir> # WORKING_DIR=<absolute_path_for_working_dir>
### Tiktoken cache directory (Store cached files in this folder for offline deployment) ### Tiktoken cache directory (Store cached files in this folder for offline deployment)
# TIKTOKEN_CACHE_DIR=./temp/tiktoken # TIKTOKEN_CACHE_DIR=/app/data/tiktoken
### Ollama Emulating Model and Tag ### Ollama Emulating Model and Tag
# OLLAMA_EMULATING_MODEL_NAME=lightrag # OLLAMA_EMULATING_MODEL_NAME=lightrag
@ -73,8 +73,14 @@ ENABLE_LLM_CACHE=true
# MAX_RELATION_TOKENS=8000 # MAX_RELATION_TOKENS=8000
### control the maximum tokens send to LLM (include entities, relations and chunks) ### control the maximum tokens send to LLM (include entities, relations and chunks)
# MAX_TOTAL_TOKENS=30000 # MAX_TOTAL_TOKENS=30000
### control the maximum chunk_ids stored
# MAX_SOURCE_IDS_PER_ENTITY=500 ### control the maximum chunk_ids stored in vector and graph db
# MAX_SOURCE_IDS_PER_ENTITY=300
# MAX_SOURCE_IDS_PER_RELATION=300
### control chunk_ids limitation method: KEEP, FIFO (KEPP: Ingore New Chunks, FIFO: New chunks replace old chunks)
# SOURCE_IDS_LIMIT_METHOD=KEEP
### Maximum number of file paths stored in entity/relation file_path field
# MAX_FILE_PATHS=30
### maximum number of related chunks per source entity or relation ### maximum number of related chunks per source entity or relation
### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph) ### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph)

View file

@ -13,7 +13,6 @@ DEFAULT_MAX_GRAPH_NODES = 1000
# Default values for extraction settings # Default values for extraction settings
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing
DEFAULT_MAX_GLEANING = 1 DEFAULT_MAX_GLEANING = 1
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 500 # Applies to Both Graph + Vector DBs
# Number of description fragments to trigger LLM summary # Number of description fragments to trigger LLM summary
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8 DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8
@ -58,8 +57,24 @@ DEFAULT_HISTORY_TURNS = 0
DEFAULT_MIN_RERANK_SCORE = 0.0 DEFAULT_MIN_RERANK_SCORE = 0.0
DEFAULT_RERANK_BINDING = "null" DEFAULT_RERANK_BINDING = "null"
# File path configuration for vector and graph database(Should not be changed, used in Milvus Schema) # Default source ids limit in meta data for entity and relation
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP"
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO"
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP
VALID_SOURCE_IDS_LIMIT_METHODS = {
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
}
# Default file_path limit in meta data for entity and relation
DEFAULT_MAX_FILE_PATHS = 2
# Field length of file_path in Milvus Schema for entity and relation (Should not be changed)
# file_path must store all file paths up to the DEFAULT_MAX_FILE_PATHS limit within the metadata.
DEFAULT_MAX_FILE_PATH_LENGTH = 32768 DEFAULT_MAX_FILE_PATH_LENGTH = 32768
# Placeholder for more file paths in meta data for entity and relation (Should not be changed)
DEFAULT_FILE_PATH_MORE_PLACEHOLDER = "truncated"
# Default temperature for LLM # Default temperature for LLM
DEFAULT_TEMPERATURE = 1.0 DEFAULT_TEMPERATURE = 1.0

View file

@ -41,10 +41,14 @@ from lightrag.constants import (
DEFAULT_MAX_PARALLEL_INSERT, DEFAULT_MAX_PARALLEL_INSERT,
DEFAULT_MAX_GRAPH_NODES, DEFAULT_MAX_GRAPH_NODES,
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, DEFAULT_MAX_SOURCE_IDS_PER_ENTITY,
DEFAULT_MAX_SOURCE_IDS_PER_RELATION,
DEFAULT_ENTITY_TYPES, DEFAULT_ENTITY_TYPES,
DEFAULT_SUMMARY_LANGUAGE, DEFAULT_SUMMARY_LANGUAGE,
DEFAULT_LLM_TIMEOUT, DEFAULT_LLM_TIMEOUT,
DEFAULT_EMBEDDING_TIMEOUT, DEFAULT_EMBEDDING_TIMEOUT,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
DEFAULT_MAX_FILE_PATHS,
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
) )
from lightrag.utils import get_env_value from lightrag.utils import get_env_value
@ -99,6 +103,9 @@ from lightrag.utils import (
generate_track_id, generate_track_id,
convert_to_user_format, convert_to_user_format,
logger, logger,
subtract_source_ids,
make_relation_chunk_key,
normalize_source_ids_limit_method,
) )
from lightrag.types import KnowledgeGraph from lightrag.types import KnowledgeGraph
from dotenv import load_dotenv from dotenv import load_dotenv
@ -362,10 +369,40 @@ class LightRAG:
"""Maximum number of graph nodes to return in knowledge graph queries.""" """Maximum number of graph nodes to return in knowledge graph queries."""
max_source_ids_per_entity: int = field( max_source_ids_per_entity: int = field(
default=get_env_value("MAX_SOURCE_IDS_PER_ENTITY", DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, int) default=get_env_value(
"MAX_SOURCE_IDS_PER_ENTITY", DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, int
)
) )
"""Maximum number of source (chunk) ids in entity Grpah + VDB.""" """Maximum number of source (chunk) ids in entity Grpah + VDB."""
max_source_ids_per_relation: int = field(
default=get_env_value(
"MAX_SOURCE_IDS_PER_RELATION",
DEFAULT_MAX_SOURCE_IDS_PER_RELATION,
int,
)
)
"""Maximum number of source (chunk) ids in relation Graph + VDB."""
source_ids_limit_method: str = field(
default_factory=lambda: normalize_source_ids_limit_method(
get_env_value(
"SOURCE_IDS_LIMIT_METHOD",
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
str,
)
)
)
"""Strategy for enforcing source_id limits: IGNORE_NEW or FIFO."""
max_file_paths: int = field(
default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int)
)
"""Maximum number of file paths to store in entity/relation file_path field."""
file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER)
"""Placeholder text when file paths exceed max_file_paths limit."""
addon_params: dict[str, Any] = field( addon_params: dict[str, Any] = field(
default_factory=lambda: { default_factory=lambda: {
"language": get_env_value( "language": get_env_value(
@ -535,6 +572,18 @@ class LightRAG:
embedding_func=self.embedding_func, embedding_func=self.embedding_func,
) )
self.entity_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=NameSpace.KV_STORE_ENTITY_CHUNKS,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.relation_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=NameSpace.KV_STORE_RELATION_CHUNKS,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore
namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION, namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
workspace=self.workspace, workspace=self.workspace,
@ -594,6 +643,8 @@ class LightRAG:
self.text_chunks, self.text_chunks,
self.full_entities, self.full_entities,
self.full_relations, self.full_relations,
self.entity_chunks,
self.relation_chunks,
self.entities_vdb, self.entities_vdb,
self.relationships_vdb, self.relationships_vdb,
self.chunks_vdb, self.chunks_vdb,
@ -616,6 +667,8 @@ class LightRAG:
("text_chunks", self.text_chunks), ("text_chunks", self.text_chunks),
("full_entities", self.full_entities), ("full_entities", self.full_entities),
("full_relations", self.full_relations), ("full_relations", self.full_relations),
("entity_chunks", self.entity_chunks),
("relation_chunks", self.relation_chunks),
("entities_vdb", self.entities_vdb), ("entities_vdb", self.entities_vdb),
("relationships_vdb", self.relationships_vdb), ("relationships_vdb", self.relationships_vdb),
("chunks_vdb", self.chunks_vdb), ("chunks_vdb", self.chunks_vdb),
@ -671,6 +724,13 @@ class LightRAG:
logger.debug("No entities found in graph, skipping migration check") logger.debug("No entities found in graph, skipping migration check")
return return
try:
# Initialize chunk tracking storage after migration
await self._migrate_chunk_tracking_storage()
except Exception as e:
logger.error(f"Error during chunk_tracking migration: {e}")
raise e
# Check if full_entities and full_relations are empty # Check if full_entities and full_relations are empty
# Get all processed documents to check their entity/relation data # Get all processed documents to check their entity/relation data
try: try:
@ -711,11 +771,11 @@ class LightRAG:
except Exception as e: except Exception as e:
logger.error(f"Error during migration check: {e}") logger.error(f"Error during migration check: {e}")
# Don't raise the error, just log it to avoid breaking initialization raise e
except Exception as e: except Exception as e:
logger.error(f"Error in data migration check: {e}") logger.error(f"Error in data migration check: {e}")
# Don't raise the error to avoid breaking initialization raise e
async def _migrate_entity_relation_data(self, processed_docs: dict): async def _migrate_entity_relation_data(self, processed_docs: dict):
"""Migrate existing entity and relation data to full_entities and full_relations storage""" """Migrate existing entity and relation data to full_entities and full_relations storage"""
@ -814,6 +874,140 @@ class LightRAG:
f"Data migration completed: migrated {migration_count} documents with entities/relations" f"Data migration completed: migrated {migration_count} documents with entities/relations"
) )
async def _migrate_chunk_tracking_storage(self) -> None:
"""Ensure entity/relation chunk tracking KV stores exist and are seeded."""
if not self.entity_chunks or not self.relation_chunks:
return
need_entity_migration = False
need_relation_migration = False
try:
need_entity_migration = await self.entity_chunks.is_empty()
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to check entity chunks storage: {exc}")
need_entity_migration = True
try:
need_relation_migration = await self.relation_chunks.is_empty()
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to check relation chunks storage: {exc}")
need_relation_migration = True
if not need_entity_migration and not need_relation_migration:
return
BATCH_SIZE = 500 # Process 500 records per batch
if need_entity_migration:
try:
nodes = await self.chunk_entity_relation_graph.get_all_nodes()
except Exception as exc:
logger.error(f"Failed to fetch nodes for chunk migration: {exc}")
nodes = []
logger.info(f"Starting chunk_tracking data migration: {len(nodes)} nodes")
# Process nodes in batches
total_nodes = len(nodes)
total_batches = (total_nodes + BATCH_SIZE - 1) // BATCH_SIZE
total_migrated = 0
for batch_idx in range(total_batches):
start_idx = batch_idx * BATCH_SIZE
end_idx = min((batch_idx + 1) * BATCH_SIZE, total_nodes)
batch_nodes = nodes[start_idx:end_idx]
upsert_payload: dict[str, dict[str, object]] = {}
for node in batch_nodes:
entity_id = node.get("entity_id") or node.get("id")
if not entity_id:
continue
raw_source = node.get("source_id") or ""
chunk_ids = [
chunk_id
for chunk_id in raw_source.split(GRAPH_FIELD_SEP)
if chunk_id
]
if not chunk_ids:
continue
upsert_payload[entity_id] = {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
if upsert_payload:
await self.entity_chunks.upsert(upsert_payload)
total_migrated += len(upsert_payload)
logger.info(
f"Processed entity batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_nodes})"
)
if total_migrated > 0:
# Persist entity_chunks data to disk
await self.entity_chunks.index_done_callback()
logger.info(
f"Entity chunk_tracking migration completed: {total_migrated} records persisted"
)
if need_relation_migration:
try:
edges = await self.chunk_entity_relation_graph.get_all_edges()
except Exception as exc:
logger.error(f"Failed to fetch edges for chunk migration: {exc}")
edges = []
logger.info(f"Starting chunk_tracking data migration: {len(edges)} edges")
# Process edges in batches
total_edges = len(edges)
total_batches = (total_edges + BATCH_SIZE - 1) // BATCH_SIZE
total_migrated = 0
for batch_idx in range(total_batches):
start_idx = batch_idx * BATCH_SIZE
end_idx = min((batch_idx + 1) * BATCH_SIZE, total_edges)
batch_edges = edges[start_idx:end_idx]
upsert_payload: dict[str, dict[str, object]] = {}
for edge in batch_edges:
src = edge.get("source") or edge.get("src_id") or edge.get("src")
tgt = edge.get("target") or edge.get("tgt_id") or edge.get("tgt")
if not src or not tgt:
continue
raw_source = edge.get("source_id") or ""
chunk_ids = [
chunk_id
for chunk_id in raw_source.split(GRAPH_FIELD_SEP)
if chunk_id
]
if not chunk_ids:
continue
storage_key = make_relation_chunk_key(src, tgt)
upsert_payload[storage_key] = {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
if upsert_payload:
await self.relation_chunks.upsert(upsert_payload)
total_migrated += len(upsert_payload)
logger.info(
f"Processed relation batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_edges})"
)
if total_migrated > 0:
# Persist relation_chunks data to disk
await self.relation_chunks.index_done_callback()
logger.info(
f"Relation chunk_tracking migration completed: {total_migrated} records persisted"
)
async def get_graph_labels(self): async def get_graph_labels(self):
text = await self.chunk_entity_relation_graph.get_all_labels() text = await self.chunk_entity_relation_graph.get_all_labels()
return text return text
@ -1676,6 +1870,8 @@ class LightRAG:
pipeline_status=pipeline_status, pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock, pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.llm_response_cache, llm_response_cache=self.llm_response_cache,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
current_file_number=current_file_number, current_file_number=current_file_number,
total_files=total_files, total_files=total_files,
file_path=file_path, file_path=file_path,
@ -1845,6 +2041,8 @@ class LightRAG:
self.text_chunks, self.text_chunks,
self.full_entities, self.full_entities,
self.full_relations, self.full_relations,
self.entity_chunks,
self.relation_chunks,
self.llm_response_cache, self.llm_response_cache,
self.entities_vdb, self.entities_vdb,
self.relationships_vdb, self.relationships_vdb,
@ -2718,9 +2916,11 @@ class LightRAG:
# 4. Analyze entities and relationships that will be affected # 4. Analyze entities and relationships that will be affected
entities_to_delete = set() entities_to_delete = set()
entities_to_rebuild = {} # entity_name -> remaining_chunk_ids entities_to_rebuild = {} # entity_name -> remaining chunk id list
relationships_to_delete = set() relationships_to_delete = set()
relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids relationships_to_rebuild = {} # (src, tgt) -> remaining chunk id list
entity_chunk_updates: dict[str, list[str]] = {}
relation_chunk_updates: dict[tuple[str, str], list[str]] = {}
try: try:
# Get affected entities and relations from full_entities and full_relations storage # Get affected entities and relations from full_entities and full_relations storage
@ -2776,14 +2976,41 @@ class LightRAG:
# Process entities # Process entities
for node_data in affected_nodes: for node_data in affected_nodes:
node_label = node_data.get("entity_id") node_label = node_data.get("entity_id")
if node_label and "source_id" in node_data: if not node_label:
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) continue
remaining_sources = sources - chunk_ids
if not remaining_sources: existing_sources: list[str] = []
entities_to_delete.add(node_label) if self.entity_chunks:
elif remaining_sources != sources: stored_chunks = await self.entity_chunks.get_by_id(node_label)
entities_to_rebuild[node_label] = remaining_sources if stored_chunks and isinstance(stored_chunks, dict):
existing_sources = [
chunk_id
for chunk_id in stored_chunks.get("chunk_ids", [])
if chunk_id
]
if not existing_sources and node_data.get("source_id"):
existing_sources = [
chunk_id
for chunk_id in node_data["source_id"].split(
GRAPH_FIELD_SEP
)
if chunk_id
]
if not existing_sources:
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
if not remaining_sources:
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
elif remaining_sources != existing_sources:
entities_to_rebuild[node_label] = remaining_sources
entity_chunk_updates[node_label] = remaining_sources
else:
logger.info(f"Untouch entity: {node_label}")
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Found {len(entities_to_rebuild)} affected entities" log_message = f"Found {len(entities_to_rebuild)} affected entities"
@ -2796,21 +3023,51 @@ class LightRAG:
src = edge_data.get("source") src = edge_data.get("source")
tgt = edge_data.get("target") tgt = edge_data.get("target")
if src and tgt and "source_id" in edge_data: if not src or not tgt or "source_id" not in edge_data:
edge_tuple = tuple(sorted((src, tgt))) continue
if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
):
continue
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) edge_tuple = tuple(sorted((src, tgt)))
remaining_sources = sources - chunk_ids if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
):
continue
if not remaining_sources: existing_sources: list[str] = []
relationships_to_delete.add(edge_tuple) if self.relation_chunks:
elif remaining_sources != sources: storage_key = make_relation_chunk_key(src, tgt)
relationships_to_rebuild[edge_tuple] = remaining_sources stored_chunks = await self.relation_chunks.get_by_id(
storage_key
)
if stored_chunks and isinstance(stored_chunks, dict):
existing_sources = [
chunk_id
for chunk_id in stored_chunks.get("chunk_ids", [])
if chunk_id
]
if not existing_sources:
existing_sources = [
chunk_id
for chunk_id in edge_data["source_id"].split(
GRAPH_FIELD_SEP
)
if chunk_id
]
if not existing_sources:
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
if not remaining_sources:
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
elif remaining_sources != existing_sources:
relationships_to_rebuild[edge_tuple] = remaining_sources
relation_chunk_updates[edge_tuple] = remaining_sources
else:
logger.info(f"Untouch relation: {edge_tuple}")
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = ( log_message = (
@ -2820,6 +3077,45 @@ class LightRAG:
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
current_time = int(time.time())
if entity_chunk_updates and self.entity_chunks:
entity_upsert_payload = {}
entity_delete_ids: set[str] = set()
for entity_name, remaining in entity_chunk_updates.items():
if not remaining:
entity_delete_ids.add(entity_name)
else:
entity_upsert_payload[entity_name] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if entity_delete_ids:
await self.entity_chunks.delete(list(entity_delete_ids))
if entity_upsert_payload:
await self.entity_chunks.upsert(entity_upsert_payload)
if relation_chunk_updates and self.relation_chunks:
relation_upsert_payload = {}
relation_delete_ids: set[str] = set()
for edge_tuple, remaining in relation_chunk_updates.items():
storage_key = make_relation_chunk_key(*edge_tuple)
if not remaining:
relation_delete_ids.add(storage_key)
else:
relation_upsert_payload[storage_key] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if relation_delete_ids:
await self.relation_chunks.delete(list(relation_delete_ids))
if relation_upsert_payload:
await self.relation_chunks.upsert(relation_upsert_payload)
except Exception as e: except Exception as e:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
@ -2914,6 +3210,8 @@ class LightRAG:
global_config=asdict(self), global_config=asdict(self),
pipeline_status=pipeline_status, pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock, pipeline_status_lock=pipeline_status_lock,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
) )
except Exception as e: except Exception as e:

View file

@ -7,7 +7,7 @@ import json_repair
from typing import Any, AsyncIterator, overload, Literal from typing import Any, AsyncIterator, overload, Literal
from collections import Counter, defaultdict from collections import Counter, defaultdict
from .utils import ( from lightrag.utils import (
logger, logger,
compute_mdhash_id, compute_mdhash_id,
Tokenizer, Tokenizer,
@ -26,15 +26,16 @@ from .utils import (
pick_by_weighted_polling, pick_by_weighted_polling,
pick_by_vector_similarity, pick_by_vector_similarity,
process_chunks_unified, process_chunks_unified,
build_file_path,
truncate_entity_source_id,
safe_vdb_operation_with_exception, safe_vdb_operation_with_exception,
create_prefixed_exception, create_prefixed_exception,
fix_tuple_delimiter_corruption, fix_tuple_delimiter_corruption,
convert_to_user_format, convert_to_user_format,
generate_reference_list_from_chunks, generate_reference_list_from_chunks,
apply_source_ids_limit,
merge_source_ids,
make_relation_chunk_key,
) )
from .base import ( from lightrag.base import (
BaseGraphStorage, BaseGraphStorage,
BaseKVStorage, BaseKVStorage,
BaseVectorStorage, BaseVectorStorage,
@ -43,8 +44,8 @@ from .base import (
QueryResult, QueryResult,
QueryContextResult, QueryContextResult,
) )
from .prompt import PROMPTS from lightrag.prompt import PROMPTS
from .constants import ( from lightrag.constants import (
GRAPH_FIELD_SEP, GRAPH_FIELD_SEP,
DEFAULT_MAX_ENTITY_TOKENS, DEFAULT_MAX_ENTITY_TOKENS,
DEFAULT_MAX_RELATION_TOKENS, DEFAULT_MAX_RELATION_TOKENS,
@ -53,8 +54,11 @@ from .constants import (
DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_KG_CHUNK_PICK_METHOD,
DEFAULT_ENTITY_TYPES, DEFAULT_ENTITY_TYPES,
DEFAULT_SUMMARY_LANGUAGE, DEFAULT_SUMMARY_LANGUAGE,
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
) )
from .kg.shared_storage import get_storage_keyed_lock from lightrag.kg.shared_storage import get_storage_keyed_lock
import time import time
from dotenv import load_dotenv from dotenv import load_dotenv
@ -474,8 +478,8 @@ async def _handle_single_relationship_extraction(
async def _rebuild_knowledge_from_chunks( async def _rebuild_knowledge_from_chunks(
entities_to_rebuild: dict[str, set[str]], entities_to_rebuild: dict[str, list[str]],
relationships_to_rebuild: dict[tuple[str, str], set[str]], relationships_to_rebuild: dict[tuple[str, str], list[str]],
knowledge_graph_inst: BaseGraphStorage, knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage, entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage,
@ -484,6 +488,8 @@ async def _rebuild_knowledge_from_chunks(
global_config: dict[str, str], global_config: dict[str, str],
pipeline_status: dict | None = None, pipeline_status: dict | None = None,
pipeline_status_lock=None, pipeline_status_lock=None,
entity_chunks_storage: BaseKVStorage | None = None,
relation_chunks_storage: BaseKVStorage | None = None,
) -> None: ) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results with parallel processing """Rebuild entity and relationship descriptions from cached extraction results with parallel processing
@ -492,8 +498,8 @@ async def _rebuild_knowledge_from_chunks(
controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency. controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency.
Args: Args:
entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids entities_to_rebuild: Dict mapping entity_name -> list of remaining chunk_ids
relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids relationships_to_rebuild: Dict mapping (src, tgt) -> list of remaining chunk_ids
knowledge_graph_inst: Knowledge graph storage knowledge_graph_inst: Knowledge graph storage
entities_vdb: Entity vector database entities_vdb: Entity vector database
relationships_vdb: Relationship vector database relationships_vdb: Relationship vector database
@ -502,6 +508,8 @@ async def _rebuild_knowledge_from_chunks(
global_config: Global configuration containing llm_model_max_async global_config: Global configuration containing llm_model_max_async
pipeline_status: Pipeline status dictionary pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status pipeline_status_lock: Lock for pipeline status
entity_chunks_storage: KV storage maintaining full chunk IDs per entity
relation_chunks_storage: KV storage maintaining full chunk IDs per relation
""" """
if not entities_to_rebuild and not relationships_to_rebuild: if not entities_to_rebuild and not relationships_to_rebuild:
return return
@ -641,10 +649,11 @@ async def _rebuild_knowledge_from_chunks(
chunk_entities=chunk_entities, chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache, llm_response_cache=llm_response_cache,
global_config=global_config, global_config=global_config,
entity_chunks_storage=entity_chunks_storage,
) )
rebuilt_entities_count += 1 rebuilt_entities_count += 1
status_message = ( status_message = (
f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks"
) )
logger.info(status_message) logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
@ -682,16 +691,11 @@ async def _rebuild_knowledge_from_chunks(
chunk_relationships=chunk_relationships, chunk_relationships=chunk_relationships,
llm_response_cache=llm_response_cache, llm_response_cache=llm_response_cache,
global_config=global_config, global_config=global_config,
relation_chunks_storage=relation_chunks_storage,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
) )
rebuilt_relationships_count += 1 rebuilt_relationships_count += 1
status_message = (
f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e: except Exception as e:
failed_relationships_count += 1 failed_relationships_count += 1
status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" status_message = f"Failed to rebuild `{src} - {tgt}`: {e}"
@ -1002,10 +1006,13 @@ async def _rebuild_single_entity(
knowledge_graph_inst: BaseGraphStorage, knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage, entities_vdb: BaseVectorStorage,
entity_name: str, entity_name: str,
chunk_ids: set[str], chunk_ids: list[str],
chunk_entities: dict, chunk_entities: dict,
llm_response_cache: BaseKVStorage, llm_response_cache: BaseKVStorage,
global_config: dict[str, str], global_config: dict[str, str],
entity_chunks_storage: BaseKVStorage | None = None,
pipeline_status: dict | None = None,
pipeline_status_lock=None,
) -> None: ) -> None:
"""Rebuild a single entity from cached extraction results""" """Rebuild a single entity from cached extraction results"""
@ -1016,7 +1023,11 @@ async def _rebuild_single_entity(
# Helper function to update entity in both graph and vector storage # Helper function to update entity in both graph and vector storage
async def _update_entity_storage( async def _update_entity_storage(
final_description: str, entity_type: str, file_paths: set[str] final_description: str,
entity_type: str,
file_paths: set[str],
source_chunk_ids: list[str],
truncation_info: str = "",
): ):
try: try:
# Update entity in graph storage (critical path) # Update entity in graph storage (critical path)
@ -1024,10 +1035,12 @@ async def _rebuild_single_entity(
**current_entity, **current_entity,
"description": final_description, "description": final_description,
"entity_type": entity_type, "entity_type": entity_type,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids), "source_id": GRAPH_FIELD_SEP.join(source_chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths) "file_path": GRAPH_FIELD_SEP.join(file_paths)
if file_paths if file_paths
else current_entity.get("file_path", "unknown_source"), else current_entity.get("file_path", "unknown_source"),
"created_at": int(time.time()),
"truncate": truncation_info,
} }
await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
@ -1060,9 +1073,33 @@ async def _rebuild_single_entity(
logger.error(error_msg) logger.error(error_msg)
raise # Re-raise exception raise # Re-raise exception
# Collect all entity data from relevant chunks # normalized_chunk_ids = merge_source_ids([], chunk_ids)
normalized_chunk_ids = chunk_ids
if entity_chunks_storage is not None and normalized_chunk_ids:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": normalized_chunk_ids,
"count": len(normalized_chunk_ids),
}
}
)
limit_method = (
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
limited_chunk_ids = apply_source_ids_limit(
normalized_chunk_ids,
global_config["max_source_ids_per_entity"],
limit_method,
identifier=f"`{entity_name}`",
)
# Collect all entity data from relevant (limited) chunks
all_entity_data = [] all_entity_data = []
for chunk_id in chunk_ids: for chunk_id in limited_chunk_ids:
if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]:
all_entity_data.extend(chunk_entities[chunk_id][entity_name]) all_entity_data.extend(chunk_entities[chunk_id][entity_name])
@ -1109,13 +1146,19 @@ async def _rebuild_single_entity(
final_description = current_entity.get("description", "") final_description = current_entity.get("description", "")
entity_type = current_entity.get("entity_type", "UNKNOWN") entity_type = current_entity.get("entity_type", "UNKNOWN")
await _update_entity_storage(final_description, entity_type, file_paths) await _update_entity_storage(
final_description,
entity_type,
file_paths,
limited_chunk_ids,
)
return return
# Process cached entity data # Process cached entity data
descriptions = [] descriptions = []
entity_types = [] entity_types = []
file_paths = set() file_paths_list = []
seen_paths = set()
for entity_data in all_entity_data: for entity_data in all_entity_data:
if entity_data.get("description"): if entity_data.get("description"):
@ -1123,7 +1166,35 @@ async def _rebuild_single_entity(
if entity_data.get("entity_type"): if entity_data.get("entity_type"):
entity_types.append(entity_data["entity_type"]) entity_types.append(entity_data["entity_type"])
if entity_data.get("file_path"): if entity_data.get("file_path"):
file_paths.add(entity_data["file_path"]) file_path = entity_data["file_path"]
if file_path and file_path not in seen_paths:
file_paths_list.append(file_path)
seen_paths.add(file_path)
# Apply MAX_FILE_PATHS limit
max_file_paths = global_config.get("max_file_paths")
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
limit_method = global_config.get("source_ids_limit_method")
original_count = len(file_paths_list)
if original_count > max_file_paths:
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
)
logger.info(
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_paths = set(file_paths_list)
# Remove duplicates while preserving order # Remove duplicates while preserving order
description_list = list(dict.fromkeys(descriptions)) description_list = list(dict.fromkeys(descriptions))
@ -1149,7 +1220,31 @@ async def _rebuild_single_entity(
else: else:
final_description = current_entity.get("description", "") final_description = current_entity.get("description", "")
await _update_entity_storage(final_description, entity_type, file_paths) if len(limited_chunk_ids) < len(normalized_chunk_ids):
truncation_info = (
f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}"
)
else:
truncation_info = ""
await _update_entity_storage(
final_description,
entity_type,
file_paths,
limited_chunk_ids,
truncation_info,
)
# Log rebuild completion with truncation info
status_message = f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks"
if truncation_info:
status_message += f" ({truncation_info})"
logger.info(status_message)
# Update pipeline status
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _rebuild_single_relationship( async def _rebuild_single_relationship(
@ -1157,10 +1252,13 @@ async def _rebuild_single_relationship(
relationships_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage,
src: str, src: str,
tgt: str, tgt: str,
chunk_ids: set[str], chunk_ids: list[str],
chunk_relationships: dict, chunk_relationships: dict,
llm_response_cache: BaseKVStorage, llm_response_cache: BaseKVStorage,
global_config: dict[str, str], global_config: dict[str, str],
relation_chunks_storage: BaseKVStorage | None = None,
pipeline_status: dict | None = None,
pipeline_status_lock=None,
) -> None: ) -> None:
"""Rebuild a single relationship from cached extraction results """Rebuild a single relationship from cached extraction results
@ -1173,9 +1271,33 @@ async def _rebuild_single_relationship(
if not current_relationship: if not current_relationship:
return return
# normalized_chunk_ids = merge_source_ids([], chunk_ids)
normalized_chunk_ids = chunk_ids
if relation_chunks_storage is not None and normalized_chunk_ids:
storage_key = make_relation_chunk_key(src, tgt)
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": normalized_chunk_ids,
"count": len(normalized_chunk_ids),
}
}
)
limit_method = (
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
limited_chunk_ids = apply_source_ids_limit(
normalized_chunk_ids,
global_config["max_source_ids_per_relation"],
limit_method,
identifier=f"`{src}`~`{tgt}`",
)
# Collect all relationship data from relevant chunks # Collect all relationship data from relevant chunks
all_relationship_data = [] all_relationship_data = []
for chunk_id in chunk_ids: for chunk_id in limited_chunk_ids:
if chunk_id in chunk_relationships: if chunk_id in chunk_relationships:
# Check both (src, tgt) and (tgt, src) since relationships can be bidirectional # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional
for edge_key in [(src, tgt), (tgt, src)]: for edge_key in [(src, tgt), (tgt, src)]:
@ -1192,7 +1314,8 @@ async def _rebuild_single_relationship(
descriptions = [] descriptions = []
keywords = [] keywords = []
weights = [] weights = []
file_paths = set() file_paths_list = []
seen_paths = set()
for rel_data in all_relationship_data: for rel_data in all_relationship_data:
if rel_data.get("description"): if rel_data.get("description"):
@ -1202,7 +1325,35 @@ async def _rebuild_single_relationship(
if rel_data.get("weight"): if rel_data.get("weight"):
weights.append(rel_data["weight"]) weights.append(rel_data["weight"])
if rel_data.get("file_path"): if rel_data.get("file_path"):
file_paths.add(rel_data["file_path"]) file_path = rel_data["file_path"]
if file_path and file_path not in seen_paths:
file_paths_list.append(file_path)
seen_paths.add(file_path)
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
limit_method = global_config.get("source_ids_limit_method")
original_count = len(file_paths_list)
if original_count > max_file_paths:
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
)
logger.info(
f"Limited `{src}`~`{tgt}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_paths = set(file_paths_list)
# Remove duplicates while preserving order # Remove duplicates while preserving order
description_list = list(dict.fromkeys(descriptions)) description_list = list(dict.fromkeys(descriptions))
@ -1230,6 +1381,13 @@ async def _rebuild_single_relationship(
# fallback to keep current(unchanged) # fallback to keep current(unchanged)
final_description = current_relationship.get("description", "") final_description = current_relationship.get("description", "")
if len(limited_chunk_ids) < len(normalized_chunk_ids):
truncation_info = (
f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}"
)
else:
truncation_info = ""
# Update relationship in graph storage # Update relationship in graph storage
updated_relationship_data = { updated_relationship_data = {
**current_relationship, **current_relationship,
@ -1238,10 +1396,11 @@ async def _rebuild_single_relationship(
else current_relationship.get("description", ""), else current_relationship.get("description", ""),
"keywords": combined_keywords, "keywords": combined_keywords,
"weight": weight, "weight": weight,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids), "source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids),
"file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp]) "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp])
if file_paths if file_paths
else current_relationship.get("file_path", "unknown_source"), else current_relationship.get("file_path", "unknown_source"),
"truncate": truncation_info,
} }
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
@ -1287,6 +1446,25 @@ async def _rebuild_single_relationship(
logger.error(error_msg) logger.error(error_msg)
raise # Re-raise exception raise # Re-raise exception
# Log rebuild completion with truncation info
status_message = f"Rebuild `{src} - {tgt}` from {len(chunk_ids)} chunks"
if truncation_info:
status_message += f" ({truncation_info})"
# Add truncation info from apply_source_ids_limit if truncation occurred
if len(limited_chunk_ids) < len(normalized_chunk_ids):
truncation_info = (
f" ({limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)})"
)
status_message += truncation_info
logger.info(status_message)
# Update pipeline status
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _merge_nodes_then_upsert( async def _merge_nodes_then_upsert(
entity_name: str, entity_name: str,
@ -1296,6 +1474,7 @@ async def _merge_nodes_then_upsert(
pipeline_status: dict = None, pipeline_status: dict = None,
pipeline_status_lock=None, pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
): ):
"""Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert."""
already_entity_types = [] already_entity_types = []
@ -1318,10 +1497,76 @@ async def _merge_nodes_then_upsert(
reverse=True, reverse=True,
)[0][0] # Get the entity type with the highest count )[0][0] # Get the entity type with the highest count
original_nodes_count = len(nodes_data)
new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")]
existing_full_source_ids = []
if entity_chunks_storage is not None:
stored_chunks = await entity_chunks_storage.get_by_id(entity_name)
if stored_chunks and isinstance(stored_chunks, dict):
existing_full_source_ids = [
chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id
]
if not existing_full_source_ids:
existing_full_source_ids = [
chunk_id for chunk_id in already_source_ids if chunk_id
]
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
if entity_chunks_storage is not None and full_source_ids:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": full_source_ids,
"count": len(full_source_ids),
}
}
)
limit_method = global_config.get("source_ids_limit_method")
max_source_limit = global_config.get("max_source_ids_per_entity")
source_ids = apply_source_ids_limit(
full_source_ids,
max_source_limit,
limit_method,
identifier=f"`{entity_name}`",
)
# Only apply filtering in KEEP(ignore new) mode
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
allowed_source_ids = set(source_ids)
filtered_nodes = []
for dp in nodes_data:
source_id = dp.get("source_id")
# Skip descriptions sourced from chunks dropped by the limitation cap
if (
source_id
and source_id not in allowed_source_ids
and source_id not in existing_full_source_ids
):
continue
filtered_nodes.append(dp)
nodes_data = filtered_nodes
else:
# In FIFO mode, keep all node descriptions - truncation happens at source_ids level only
nodes_data = list(nodes_data)
skip_summary_due_to_limit = (
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
and len(existing_full_source_ids) >= max_source_limit
and not nodes_data
and already_description
)
# Deduplicate by description, keeping first occurrence # Deduplicate by description, keeping first occurrence
unique_nodes = {} unique_nodes = {}
for dp in nodes_data: for dp in nodes_data:
desc = dp["description"] desc = dp.get("description")
if not desc:
continue
if desc not in unique_nodes: if desc not in unique_nodes:
unique_nodes[desc] = dp unique_nodes[desc] = dp
@ -1332,17 +1577,31 @@ async def _merge_nodes_then_upsert(
) )
sorted_descriptions = [dp["description"] for dp in sorted_nodes] sorted_descriptions = [dp["description"] for dp in sorted_nodes]
truncation_info = ""
dd_message = ""
# Combine already_description with sorted new sorted descriptions # Combine already_description with sorted new sorted descriptions
description_list = already_description + sorted_descriptions description_list = already_description + sorted_descriptions
deduplicated_num = original_nodes_count - len(sorted_descriptions)
if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}"
num_fragment = len(description_list) num_fragment = len(description_list)
already_fragment = len(already_description) already_fragment = len(already_description)
deduplicated_num = already_fragment + len(nodes_data) - num_fragment if skip_summary_due_to_limit:
if deduplicated_num > 0: description = (
dd_message = f"(dd:{deduplicated_num})" already_node.get("description", "(no description)")
else: if already_node
dd_message = "" else "(no description)"
if num_fragment > 0: )
llm_was_used = False
status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached"
logger.debug(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
elif num_fragment > 0:
# Get summary and LLM usage status # Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary( description, llm_was_used = await _handle_entity_relation_summary(
"Entity", "Entity",
@ -1355,9 +1614,16 @@ async def _merge_nodes_then_upsert(
# Log based on actual LLM usage # Log based on actual LLM usage
if llm_was_used: if llm_was_used:
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
else: else:
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
# Add truncation info from apply_source_ids_limit if truncation occurred
if len(source_ids) < len(full_source_ids):
truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}"
if dd_message or truncation_info:
status_message += f" ({', '.join([truncation_info, dd_message])})"
if already_fragment > 0 or llm_was_used: if already_fragment > 0 or llm_was_used:
logger.info(status_message) logger.info(status_message)
@ -1372,12 +1638,67 @@ async def _merge_nodes_then_upsert(
logger.error(f"Entity {entity_name} has no description") logger.error(f"Entity {entity_name} has no description")
description = "(no description)" description = "(no description)"
merged_source_ids: set = set([dp["source_id"] for dp in nodes_data] + already_source_ids)
source_ids = truncate_entity_source_id(merged_source_ids, entity_name, global_config)
source_id = GRAPH_FIELD_SEP.join(source_ids) source_id = GRAPH_FIELD_SEP.join(source_ids)
file_path = build_file_path(already_file_paths, nodes_data, entity_name) # Build file_path with count limit
if skip_summary_due_to_limit:
# Skip limit, keep original file_path
file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
else:
# Collect and apply limit
file_paths_list = []
seen_paths = set()
# Get placeholder to filter it out
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
# Collect from already_file_paths, excluding placeholder
for fp in already_file_paths:
# Skip placeholders (format: "...{placeholder}(showing X of Y)...")
if (
fp
and not fp.startswith(f"...{file_path_placeholder}")
and fp not in seen_paths
):
file_paths_list.append(fp)
seen_paths.add(fp)
# Collect from new data
for dp in nodes_data:
file_path_item = dp.get("file_path")
if file_path_item and file_path_item not in seen_paths:
file_paths_list.append(file_path_item)
seen_paths.add(file_path_item)
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
if len(file_paths_list) > max_file_paths:
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
original_count = len(file_paths_list)
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
)
logger.info(
f"Limited `{entity_name}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
node_data = dict( node_data = dict(
entity_id=entity_name, entity_id=entity_name,
@ -1386,6 +1707,7 @@ async def _merge_nodes_then_upsert(
source_id=source_id, source_id=source_id,
file_path=file_path, file_path=file_path,
created_at=int(time.time()), created_at=int(time.time()),
truncate=truncation_info,
) )
await knowledge_graph_inst.upsert_node( await knowledge_graph_inst.upsert_node(
entity_name, entity_name,
@ -1405,6 +1727,7 @@ async def _merge_edges_then_upsert(
pipeline_status_lock=None, pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
added_entities: list = None, # New parameter to track entities added during edge processing added_entities: list = None, # New parameter to track entities added during edge processing
relation_chunks_storage: BaseKVStorage | None = None,
): ):
if src_id == tgt_id: if src_id == tgt_id:
return None return None
@ -1448,16 +1771,85 @@ async def _merge_edges_then_upsert(
) )
) )
original_edges_count = len(edges_data)
new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")]
storage_key = make_relation_chunk_key(src_id, tgt_id)
existing_full_source_ids = []
if relation_chunks_storage is not None:
stored_chunks = await relation_chunks_storage.get_by_id(storage_key)
if stored_chunks and isinstance(stored_chunks, dict):
existing_full_source_ids = [
chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id
]
if not existing_full_source_ids:
existing_full_source_ids = [
chunk_id for chunk_id in already_source_ids if chunk_id
]
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
if relation_chunks_storage is not None and full_source_ids:
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": full_source_ids,
"count": len(full_source_ids),
}
}
)
limit_method = global_config.get("source_ids_limit_method")
max_source_limit = global_config.get("max_source_ids_per_relation")
source_ids = apply_source_ids_limit(
full_source_ids,
max_source_limit,
limit_method,
identifier=f"`{src_id}`~`{tgt_id}`",
)
limit_method = (
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
# Only apply filtering in IGNORE_NEW mode
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
allowed_source_ids = set(source_ids)
filtered_edges = []
for dp in edges_data:
source_id = dp.get("source_id")
# Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap
if (
source_id
and source_id not in allowed_source_ids
and source_id not in existing_full_source_ids
):
continue
filtered_edges.append(dp)
edges_data = filtered_edges
else:
# In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only
edges_data = list(edges_data)
skip_summary_due_to_limit = (
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
and len(existing_full_source_ids) >= max_source_limit
and not edges_data
and already_description
)
# Process edges_data with None checks # Process edges_data with None checks
weight = sum([dp["weight"] for dp in edges_data] + already_weights) weight = sum([dp["weight"] for dp in edges_data] + already_weights)
# Deduplicate by description, keeping first occurrence # Deduplicate by description, keeping first occurrence
unique_edges = {} unique_edges = {}
for dp in edges_data: for dp in edges_data:
if dp.get("description"): description_value = dp.get("description")
desc = dp["description"] if not description_value:
if desc not in unique_edges: continue
unique_edges[desc] = dp if description_value not in unique_edges:
unique_edges[description_value] = dp
# Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same
sorted_edges = sorted( sorted_edges = sorted(
@ -1466,17 +1858,34 @@ async def _merge_edges_then_upsert(
) )
sorted_descriptions = [dp["description"] for dp in sorted_edges] sorted_descriptions = [dp["description"] for dp in sorted_edges]
truncation_info = ""
dd_message = ""
# Combine already_description with sorted new descriptions # Combine already_description with sorted new descriptions
description_list = already_description + sorted_descriptions description_list = already_description + sorted_descriptions
deduplicated_num = original_edges_count - len(sorted_descriptions)
if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}"
num_fragment = len(description_list) num_fragment = len(description_list)
already_fragment = len(already_description) already_fragment = len(already_description)
deduplicated_num = already_fragment + len(edges_data) - num_fragment
if deduplicated_num > 0: if skip_summary_due_to_limit:
dd_message = f"(dd:{deduplicated_num})" description = (
else: already_edge.get("description", "(no description)")
dd_message = "" if already_edge
if num_fragment > 0: else "(no description)"
)
llm_was_used = False
status_message = (
f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached"
)
logger.debug(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
elif num_fragment > 0:
# Get summary and LLM usage status # Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary( description, llm_was_used = await _handle_entity_relation_summary(
"Relation", "Relation",
@ -1489,9 +1898,16 @@ async def _merge_edges_then_upsert(
# Log based on actual LLM usage # Log based on actual LLM usage
if llm_was_used: if llm_was_used:
status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
else: else:
status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
# Add truncation info from apply_source_ids_limit if truncation occurred
if len(source_ids) < len(full_source_ids):
truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}"
if dd_message or truncation_info:
status_message += f" ({', '.join([truncation_info, dd_message])})"
if already_fragment > 0 or llm_was_used: if already_fragment > 0 or llm_was_used:
logger.info(status_message) logger.info(status_message)
@ -1521,13 +1937,67 @@ async def _merge_edges_then_upsert(
# Join all unique keywords with commas # Join all unique keywords with commas
keywords = ",".join(sorted(all_keywords)) keywords = ",".join(sorted(all_keywords))
source_id = GRAPH_FIELD_SEP.join( source_id = GRAPH_FIELD_SEP.join(source_ids)
set(
[dp["source_id"] for dp in edges_data if dp.get("source_id")] # Build file_path with count limit
+ already_source_ids if skip_summary_due_to_limit:
# Skip limit, keep original file_path
file_path = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
else:
# Collect and apply limit
file_paths_list = []
seen_paths = set()
# Get placeholder to filter it out
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
) )
)
file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") # Collect from already_file_paths, excluding placeholder
for fp in already_file_paths:
# Skip placeholders (format: "...{placeholder}(showing X of Y)...")
if (
fp
and not fp.startswith(f"...{file_path_placeholder}")
and fp not in seen_paths
):
file_paths_list.append(fp)
seen_paths.add(fp)
# Collect from new data
for dp in edges_data:
file_path_item = dp.get("file_path")
if file_path_item and file_path_item not in seen_paths:
file_paths_list.append(file_path_item)
seen_paths.add(file_path_item)
# Apply count limit
max_file_paths = global_config.get("max_file_paths")
if len(file_paths_list) > max_file_paths:
limit_method = global_config.get(
"source_ids_limit_method", SOURCE_IDS_LIMIT_METHOD_KEEP
)
file_path_placeholder = global_config.get(
"file_path_more_placeholder", DEFAULT_FILE_PATH_MORE_PLACEHOLDER
)
original_count = len(file_paths_list)
if limit_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
# FIFO: keep tail (newest), discard head
file_paths_list = file_paths_list[-max_file_paths:]
else:
# KEEP: keep head (earliest), discard tail
file_paths_list = file_paths_list[:max_file_paths]
file_paths_list.append(
f"...{file_path_placeholder}(showing {max_file_paths} of {original_count})..."
)
logger.info(
f"Limited `{src_id}`~`{tgt_id}`: file_path {original_count} -> {max_file_paths} ({limit_method})"
)
file_path = GRAPH_FIELD_SEP.join(file_paths_list)
for need_insert_id in [src_id, tgt_id]: for need_insert_id in [src_id, tgt_id]:
if not (await knowledge_graph_inst.has_node(need_insert_id)): if not (await knowledge_graph_inst.has_node(need_insert_id)):
@ -1538,6 +2008,7 @@ async def _merge_edges_then_upsert(
"entity_type": "UNKNOWN", "entity_type": "UNKNOWN",
"file_path": file_path, "file_path": file_path,
"created_at": int(time.time()), "created_at": int(time.time()),
"truncate": "",
} }
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
@ -1563,6 +2034,7 @@ async def _merge_edges_then_upsert(
source_id=source_id, source_id=source_id,
file_path=file_path, file_path=file_path,
created_at=int(time.time()), created_at=int(time.time()),
truncate=truncation_info,
), ),
) )
@ -1574,6 +2046,7 @@ async def _merge_edges_then_upsert(
source_id=source_id, source_id=source_id,
file_path=file_path, file_path=file_path,
created_at=int(time.time()), created_at=int(time.time()),
truncate=truncation_info,
) )
return edge_data return edge_data
@ -1591,6 +2064,8 @@ async def merge_nodes_and_edges(
pipeline_status: dict = None, pipeline_status: dict = None,
pipeline_status_lock=None, pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
relation_chunks_storage: BaseKVStorage | None = None,
current_file_number: int = 0, current_file_number: int = 0,
total_files: int = 0, total_files: int = 0,
file_path: str = "unknown_source", file_path: str = "unknown_source",
@ -1614,6 +2089,8 @@ async def merge_nodes_and_edges(
pipeline_status: Pipeline status dictionary pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status pipeline_status_lock: Lock for pipeline status
llm_response_cache: LLM response cache llm_response_cache: LLM response cache
entity_chunks_storage: Storage tracking full chunk lists per entity
relation_chunks_storage: Storage tracking full chunk lists per relation
current_file_number: Current file number for logging current_file_number: Current file number for logging
total_files: Total files for logging total_files: Total files for logging
file_path: File path for logging file_path: File path for logging
@ -1671,13 +2148,14 @@ async def merge_nodes_and_edges(
pipeline_status, pipeline_status,
pipeline_status_lock, pipeline_status_lock,
llm_response_cache, llm_response_cache,
entity_chunks_storage,
) )
# Vector database operation (equally critical, must succeed) # Vector database operation (equally critical, must succeed)
if entity_vdb is not None and entity_data: if entity_vdb is not None and entity_data:
data_for_vdb = { data_for_vdb = {
compute_mdhash_id( compute_mdhash_id(
entity_data["entity_name"], prefix="ent-" str(entity_data["entity_name"]), prefix="ent-"
): { ): {
"entity_name": entity_data["entity_name"], "entity_name": entity_data["entity_name"],
"entity_type": entity_data["entity_type"], "entity_type": entity_data["entity_type"],
@ -1689,7 +2167,6 @@ async def merge_nodes_and_edges(
} }
} }
logger.debug(f"Inserting {entity_name} in Graph") logger.debug(f"Inserting {entity_name} in Graph")
# Use safe operation wrapper - VDB failure must throw exception # Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception( await safe_vdb_operation_with_exception(
@ -1804,6 +2281,7 @@ async def merge_nodes_and_edges(
pipeline_status_lock, pipeline_status_lock,
llm_response_cache, llm_response_cache,
added_entities, # Pass list to collect added entities added_entities, # Pass list to collect added entities
relation_chunks_storage,
) )
if edge_data is None: if edge_data is None:
@ -3338,7 +3816,11 @@ async def _build_query_context(
query_embedding=search_result["query_embedding"], query_embedding=search_result["query_embedding"],
) )
if not merged_chunks: if (
not merged_chunks
and not truncation_result["entities_context"]
and not truncation_result["relations_context"]
):
return None return None
# Stage 4: Build final LLM context with dynamic token processing # Stage 4: Build final LLM context with dynamic token processing

View file

@ -15,7 +15,17 @@ from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from functools import wraps from functools import wraps
from hashlib import md5 from hashlib import md5
from typing import Any, Protocol, Callable, TYPE_CHECKING, List, Optional from typing import (
Any,
Protocol,
Callable,
TYPE_CHECKING,
List,
Optional,
Iterable,
Sequence,
Collection,
)
import numpy as np import numpy as np
from dotenv import load_dotenv from dotenv import load_dotenv
@ -25,7 +35,9 @@ from lightrag.constants import (
DEFAULT_LOG_FILENAME, DEFAULT_LOG_FILENAME,
GRAPH_FIELD_SEP, GRAPH_FIELD_SEP,
DEFAULT_MAX_TOTAL_TOKENS, DEFAULT_MAX_TOTAL_TOKENS,
DEFAULT_MAX_FILE_PATH_LENGTH, DEFAULT_SOURCE_IDS_LIMIT_METHOD,
VALID_SOURCE_IDS_LIMIT_METHODS,
SOURCE_IDS_LIMIT_METHOD_FIFO,
) )
# Initialize logger with basic configuration # Initialize logger with basic configuration
@ -2464,82 +2476,111 @@ async def process_chunks_unified(
return final_chunks return final_chunks
def truncate_entity_source_id(chunk_ids: set, entity_name: str, global_config: dict) -> set:
"""Limit chunk_ids, for entities that appear a HUGE no of times (To not break VDB hard upper limits)"""
already_len: int = len(chunk_ids)
max_chunk_ids_per_entity = global_config["max_source_ids_per_entity"] def normalize_source_ids_limit_method(method: str | None) -> str:
"""Normalize the source ID limiting strategy and fall back to default when invalid."""
if already_len <= max_chunk_ids_per_entity: if not method:
return chunk_ids return DEFAULT_SOURCE_IDS_LIMIT_METHOD
logger.warning( normalized = method.upper()
f"Source Ids already exceeds {max_chunk_ids_per_entity } for {entity_name}, " if normalized not in VALID_SOURCE_IDS_LIMIT_METHODS:
f"current size: {already_len}, truncating..."
)
truncated_chunk_ids = set(list(chunk_ids)[0:max_chunk_ids_per_entity ])
return truncated_chunk_ids
def build_file_path(already_file_paths, data_list, target):
"""Build file path string with UTF-8 byte length limit and deduplication
Args:
already_file_paths: List of existing file paths
data_list: List of data items containing file_path
target: Target name for logging warnings
Returns:
str: Combined file paths separated by GRAPH_FIELD_SEP
"""
# set: deduplication
file_paths_set = {fp for fp in already_file_paths if fp}
# string: filter empty value and keep file order in already_file_paths
file_paths = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
# Check if initial file_paths already exceeds byte length limit
if len(file_paths.encode("utf-8")) >= DEFAULT_MAX_FILE_PATH_LENGTH:
logger.warning( logger.warning(
f"Initial file_paths already exceeds {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " "Unknown SOURCE_IDS_LIMIT_METHOD '%s', falling back to %s",
f"current size: {len(file_paths.encode('utf-8'))} bytes" method,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
)
return DEFAULT_SOURCE_IDS_LIMIT_METHOD
return normalized
def merge_source_ids(
existing_ids: Iterable[str] | None, new_ids: Iterable[str] | None
) -> list[str]:
"""Merge two iterables of source IDs while preserving order and removing duplicates."""
merged: list[str] = []
seen: set[str] = set()
for sequence in (existing_ids, new_ids):
if not sequence:
continue
for source_id in sequence:
if not source_id:
continue
if source_id not in seen:
seen.add(source_id)
merged.append(source_id)
return merged
def apply_source_ids_limit(
source_ids: Sequence[str],
limit: int,
method: str,
*,
identifier: str | None = None,
) -> list[str]:
"""Apply a limit strategy to a sequence of source IDs."""
if limit <= 0:
return []
source_ids_list = list(source_ids)
if len(source_ids_list) <= limit:
return source_ids_list
normalized_method = normalize_source_ids_limit_method(method)
if normalized_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
truncated = source_ids_list[-limit:]
else: # IGNORE_NEW
truncated = source_ids_list[:limit]
if identifier and len(truncated) < len(source_ids_list):
logger.debug(
"Source_id truncated: %s | %s keeping %s of %s entries",
identifier,
normalized_method,
len(truncated),
len(source_ids_list),
) )
# ignored file_paths return truncated
file_paths_ignore = ""
# add file_paths
for dp in data_list:
cur_file_path = dp.get("file_path")
# empty
if not cur_file_path:
continue
# skip duplicate item
if cur_file_path in file_paths_set:
continue
# add
file_paths_set.add(cur_file_path)
# check the UTF-8 byte length def subtract_source_ids(
new_addition = GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path source_ids: Iterable[str],
if ( ids_to_remove: Collection[str],
len(file_paths.encode("utf-8")) + len(new_addition.encode("utf-8")) ) -> list[str]:
< DEFAULT_MAX_FILE_PATH_LENGTH - 5 """Remove a collection of IDs from an ordered iterable while preserving order."""
):
# append
file_paths += new_addition
else:
# ignore
file_paths_ignore += GRAPH_FIELD_SEP + cur_file_path
if file_paths_ignore: removal_set = set(ids_to_remove)
logger.warning( if not removal_set:
f"File paths exceed {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " return [source_id for source_id in source_ids if source_id]
f"ignoring file path: {file_paths_ignore}"
) return [
return file_paths source_id
for source_id in source_ids
if source_id and source_id not in removal_set
]
def make_relation_chunk_key(src: str, tgt: str) -> str:
"""Create a deterministic storage key for relation chunk tracking."""
return GRAPH_FIELD_SEP.join(sorted((src, tgt)))
def parse_relation_chunk_key(key: str) -> tuple[str, str]:
"""Parse a relation chunk storage key back into its entity pair."""
parts = key.split(GRAPH_FIELD_SEP)
if len(parts) != 2:
raise ValueError(f"Invalid relation chunk key: {key}")
return parts[0], parts[1]
def generate_track_id(prefix: str = "upload") -> str: def generate_track_id(prefix: str = "upload") -> str: