Add entity/relation chunk tracking with configurable source ID limits

- Add entity_chunks & relation_chunks storage
- Implement KEEP/FIFO limit strategies
- Update env.example with new settings
- Add migration for chunk tracking data
- Support all KV storage
This commit is contained in:
yangdx 2025-10-20 15:24:15 +08:00
parent bdadaa6750
commit dc62c78f98
13 changed files with 1098 additions and 290 deletions

View file

@ -73,8 +73,11 @@ ENABLE_LLM_CACHE=true
# MAX_RELATION_TOKENS=8000 # MAX_RELATION_TOKENS=8000
### control the maximum tokens send to LLM (include entities, relations and chunks) ### control the maximum tokens send to LLM (include entities, relations and chunks)
# MAX_TOTAL_TOKENS=30000 # MAX_TOTAL_TOKENS=30000
### control the maximum chunk_ids stored ### control the maximum chunk_ids stored in vector and graph db
# MAX_SOURCE_IDS_PER_ENTITY=500 # MAX_SOURCE_IDS_PER_ENTITY=300
# MAX_SOURCE_IDS_PER_RELATION=300
### control chunk_ids limitation method: KEEP, FIFO (KEPP: Ingore New Chunks, FIFO: New chunks replace old chunks)
# SOURCE_IDS_LIMIT_METHOD=KEEP
### maximum number of related chunks per source entity or relation ### maximum number of related chunks per source entity or relation
### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph) ### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph)

View file

@ -2003,6 +2003,8 @@ def create_document_routes(
rag.full_docs, rag.full_docs,
rag.full_entities, rag.full_entities,
rag.full_relations, rag.full_relations,
rag.entity_chunks,
rag.relation_chunks,
rag.entities_vdb, rag.entities_vdb,
rag.relationships_vdb, rag.relationships_vdb,
rag.chunks_vdb, rag.chunks_vdb,

View file

@ -355,6 +355,14 @@ class BaseKVStorage(StorageNameSpace, ABC):
None None
""" """
@abstractmethod
async def is_empty(self) -> bool:
"""Check if the storage is empty
Returns:
bool: True if storage contains no data, False otherwise
"""
@dataclass @dataclass
class BaseGraphStorage(StorageNameSpace, ABC): class BaseGraphStorage(StorageNameSpace, ABC):

View file

@ -13,7 +13,16 @@ DEFAULT_MAX_GRAPH_NODES = 1000
# Default values for extraction settings # Default values for extraction settings
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing
DEFAULT_MAX_GLEANING = 1 DEFAULT_MAX_GLEANING = 1
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 500 # Applies to Both Graph + Vector DBs
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP"
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO"
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP
VALID_SOURCE_IDS_LIMIT_METHODS = {
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
}
# Number of description fragments to trigger LLM summary # Number of description fragments to trigger LLM summary
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8 DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8

View file

@ -187,6 +187,20 @@ class JsonDocStatusStorage(DocStatusStorage):
await self.index_done_callback() await self.index_done_callback()
async def is_empty(self) -> bool:
"""Check if the storage is empty
Returns:
bool: True if storage is empty, False otherwise
Raises:
StorageNotInitializedError: If storage is not initialized
"""
if self._storage_lock is None:
raise StorageNotInitializedError("JsonDocStatusStorage")
async with self._storage_lock:
return len(self._data) == 0
async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
async with self._storage_lock: async with self._storage_lock:
return self._data.get(id) return self._data.get(id)

View file

@ -84,26 +84,6 @@ class JsonKVStorage(BaseKVStorage):
write_json(data_dict, self._file_name) write_json(data_dict, self._file_name)
await clear_all_update_flags(self.final_namespace) await clear_all_update_flags(self.final_namespace)
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
async with self._storage_lock:
result = {}
for key, value in self._data.items():
if value:
# Create a copy to avoid modifying the original data
data = dict(value)
# Ensure time fields are present, provide default values for old data
data.setdefault("create_time", 0)
data.setdefault("update_time", 0)
result[key] = data
else:
result[key] = value
return result
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
async with self._storage_lock: async with self._storage_lock:
result = self._data.get(id) result = self._data.get(id)
@ -200,6 +180,15 @@ class JsonKVStorage(BaseKVStorage):
if any_deleted: if any_deleted:
await set_all_update_flags(self.final_namespace) await set_all_update_flags(self.final_namespace)
async def is_empty(self) -> bool:
"""Check if the storage is empty
Returns:
bool: True if storage contains no data, False otherwise
"""
async with self._storage_lock:
return len(self._data) == 0
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources """Drop all data from storage and clean up resources
This action will persistent the data to disk immediately. This action will persistent the data to disk immediately.

View file

@ -175,22 +175,6 @@ class MongoKVStorage(BaseKVStorage):
existing_ids = {str(x["_id"]) async for x in cursor} existing_ids = {str(x["_id"]) async for x in cursor}
return keys - existing_ids return keys - existing_ids
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
cursor = self._data.find({})
result = {}
async for doc in cursor:
doc_id = doc.pop("_id")
# Ensure time fields are present for all documents
doc.setdefault("create_time", 0)
doc.setdefault("update_time", 0)
result[doc_id] = doc
return result
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}") logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}")
if not data: if not data:
@ -236,6 +220,20 @@ class MongoKVStorage(BaseKVStorage):
# Mongo handles persistence automatically # Mongo handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
try:
# Use count_documents with limit 1 for efficiency
count = await self._data.count_documents({}, limit=1)
return count == 0
except PyMongoError as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete documents with specified IDs """Delete documents with specified IDs
@ -466,6 +464,20 @@ class MongoDocStatusStorage(DocStatusStorage):
# Mongo handles persistence automatically # Mongo handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
try:
# Use count_documents with limit 1 for efficiency
count = await self._data.count_documents({}, limit=1)
return count == 0
except PyMongoError as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection. """Drop the storage by removing all documents in the collection.

View file

@ -1656,113 +1656,6 @@ class PGKVStorage(BaseKVStorage):
self.db = None self.db = None
################ QUERY METHODS ################ ################ QUERY METHODS ################
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for get_all: {self.namespace}"
)
return {}
sql = f"SELECT * FROM {table_name} WHERE workspace=$1"
params = {"workspace": self.workspace}
try:
results = await self.db.query(sql, list(params.values()), multirows=True)
# Special handling for LLM cache to ensure compatibility with _get_cached_extraction_results
if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
processed_results = {}
for row in results:
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
# Map field names and add cache_type for compatibility
processed_row = {
**row,
"return": row.get("return_value", ""),
"cache_type": row.get("original_prompt", "unknow"),
"original_prompt": row.get("original_prompt", ""),
"chunk_id": row.get("chunk_id"),
"mode": row.get("mode", "default"),
"create_time": create_time,
"update_time": create_time if update_time == 0 else update_time,
}
processed_results[row["id"]] = processed_row
return processed_results
# For text_chunks namespace, parse llm_cache_list JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
processed_results = {}
for row in results:
llm_cache_list = row.get("llm_cache_list", [])
if isinstance(llm_cache_list, str):
try:
llm_cache_list = json.loads(llm_cache_list)
except json.JSONDecodeError:
llm_cache_list = []
row["llm_cache_list"] = llm_cache_list
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
row["create_time"] = create_time
row["update_time"] = (
create_time if update_time == 0 else update_time
)
processed_results[row["id"]] = row
return processed_results
# For FULL_ENTITIES namespace, parse entity_names JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES):
processed_results = {}
for row in results:
entity_names = row.get("entity_names", [])
if isinstance(entity_names, str):
try:
entity_names = json.loads(entity_names)
except json.JSONDecodeError:
entity_names = []
row["entity_names"] = entity_names
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
row["create_time"] = create_time
row["update_time"] = (
create_time if update_time == 0 else update_time
)
processed_results[row["id"]] = row
return processed_results
# For FULL_RELATIONS namespace, parse relation_pairs JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS):
processed_results = {}
for row in results:
relation_pairs = row.get("relation_pairs", [])
if isinstance(relation_pairs, str):
try:
relation_pairs = json.loads(relation_pairs)
except json.JSONDecodeError:
relation_pairs = []
row["relation_pairs"] = relation_pairs
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
row["create_time"] = create_time
row["update_time"] = (
create_time if update_time == 0 else update_time
)
processed_results[row["id"]] = row
return processed_results
# For other namespaces, return as-is
return {row["id"]: row for row in results}
except Exception as e:
logger.error(
f"[{self.workspace}] Error retrieving all data from {self.namespace}: {e}"
)
return {}
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get data by id.""" """Get data by id."""
sql = SQL_TEMPLATES["get_by_id_" + self.namespace] sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
@ -1838,6 +1731,38 @@ class PGKVStorage(BaseKVStorage):
response["create_time"] = create_time response["create_time"] = create_time
response["update_time"] = create_time if update_time == 0 else update_time response["update_time"] = create_time if update_time == 0 else update_time
# Special handling for ENTITY_CHUNKS namespace
if response and is_namespace(self.namespace, NameSpace.KV_STORE_ENTITY_CHUNKS):
# Parse chunk_ids JSON string back to list
chunk_ids = response.get("chunk_ids", [])
if isinstance(chunk_ids, str):
try:
chunk_ids = json.loads(chunk_ids)
except json.JSONDecodeError:
chunk_ids = []
response["chunk_ids"] = chunk_ids
create_time = response.get("create_time", 0)
update_time = response.get("update_time", 0)
response["create_time"] = create_time
response["update_time"] = create_time if update_time == 0 else update_time
# Special handling for RELATION_CHUNKS namespace
if response and is_namespace(
self.namespace, NameSpace.KV_STORE_RELATION_CHUNKS
):
# Parse chunk_ids JSON string back to list
chunk_ids = response.get("chunk_ids", [])
if isinstance(chunk_ids, str):
try:
chunk_ids = json.loads(chunk_ids)
except json.JSONDecodeError:
chunk_ids = []
response["chunk_ids"] = chunk_ids
create_time = response.get("create_time", 0)
update_time = response.get("update_time", 0)
response["create_time"] = create_time
response["update_time"] = create_time if update_time == 0 else update_time
return response if response else None return response if response else None
# Query by id # Query by id
@ -1946,6 +1871,38 @@ class PGKVStorage(BaseKVStorage):
result["create_time"] = create_time result["create_time"] = create_time
result["update_time"] = create_time if update_time == 0 else update_time result["update_time"] = create_time if update_time == 0 else update_time
# Special handling for ENTITY_CHUNKS namespace
if results and is_namespace(self.namespace, NameSpace.KV_STORE_ENTITY_CHUNKS):
for result in results:
# Parse chunk_ids JSON string back to list
chunk_ids = result.get("chunk_ids", [])
if isinstance(chunk_ids, str):
try:
chunk_ids = json.loads(chunk_ids)
except json.JSONDecodeError:
chunk_ids = []
result["chunk_ids"] = chunk_ids
create_time = result.get("create_time", 0)
update_time = result.get("update_time", 0)
result["create_time"] = create_time
result["update_time"] = create_time if update_time == 0 else update_time
# Special handling for RELATION_CHUNKS namespace
if results and is_namespace(self.namespace, NameSpace.KV_STORE_RELATION_CHUNKS):
for result in results:
# Parse chunk_ids JSON string back to list
chunk_ids = result.get("chunk_ids", [])
if isinstance(chunk_ids, str):
try:
chunk_ids = json.loads(chunk_ids)
except json.JSONDecodeError:
chunk_ids = []
result["chunk_ids"] = chunk_ids
create_time = result.get("create_time", 0)
update_time = result.get("update_time", 0)
result["create_time"] = create_time
result["update_time"] = create_time if update_time == 0 else update_time
return _order_results(results) return _order_results(results)
async def filter_keys(self, keys: set[str]) -> set[str]: async def filter_keys(self, keys: set[str]) -> set[str]:
@ -2050,11 +2007,61 @@ class PGKVStorage(BaseKVStorage):
"update_time": current_time, "update_time": current_time,
} }
await self.db.execute(upsert_sql, _data) await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_ENTITY_CHUNKS):
# Get current UTC time and convert to naive datetime for database storage
current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_entity_chunks"]
_data = {
"workspace": self.workspace,
"id": k,
"chunk_ids": json.dumps(v["chunk_ids"]),
"count": v["count"],
"create_time": current_time,
"update_time": current_time,
}
await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_RELATION_CHUNKS):
# Get current UTC time and convert to naive datetime for database storage
current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_relation_chunks"]
_data = {
"workspace": self.workspace,
"id": k,
"chunk_ids": json.dumps(v["chunk_ids"]),
"count": v["count"],
"create_time": current_time,
"update_time": current_time,
}
await self.db.execute(upsert_sql, _data)
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# PG handles persistence automatically # PG handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for is_empty check: {self.namespace}"
)
return True
sql = f"SELECT EXISTS(SELECT 1 FROM {table_name} WHERE workspace=$1 LIMIT 1) as has_data"
try:
result = await self.db.query(sql, [self.workspace])
return not result.get("has_data", False) if result else True
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
@ -2970,6 +2977,28 @@ class PGDocStatusStorage(DocStatusStorage):
# PG handles persistence automatically # PG handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for is_empty check: {self.namespace}"
)
return True
sql = f"SELECT EXISTS(SELECT 1 FROM {table_name} WHERE workspace=$1 LIMIT 1) as has_data"
try:
result = await self.db.query(sql, [self.workspace])
return not result.get("has_data", False) if result else True
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
@ -4721,6 +4750,8 @@ NAMESPACE_TABLE_MAP = {
NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS", NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS",
NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES", NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES",
NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS", NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS",
NameSpace.KV_STORE_ENTITY_CHUNKS: "LIGHTRAG_ENTITY_CHUNKS",
NameSpace.KV_STORE_RELATION_CHUNKS: "LIGHTRAG_RELATION_CHUNKS",
NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE", NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE",
NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_VDB_CHUNKS", NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_VDB_CHUNKS",
NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY", NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY",
@ -4861,6 +4892,28 @@ TABLES = {
CONSTRAINT LIGHTRAG_FULL_RELATIONS_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_FULL_RELATIONS_PK PRIMARY KEY (workspace, id)
)""" )"""
}, },
"LIGHTRAG_ENTITY_CHUNKS": {
"ddl": """CREATE TABLE LIGHTRAG_ENTITY_CHUNKS (
id VARCHAR(512),
workspace VARCHAR(255),
chunk_ids JSONB,
count INTEGER,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_ENTITY_CHUNKS_PK PRIMARY KEY (workspace, id)
)"""
},
"LIGHTRAG_RELATION_CHUNKS": {
"ddl": """CREATE TABLE LIGHTRAG_RELATION_CHUNKS (
id VARCHAR(512),
workspace VARCHAR(255),
chunk_ids JSONB,
count INTEGER,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_RELATION_CHUNKS_PK PRIMARY KEY (workspace, id)
)"""
},
} }
@ -4918,6 +4971,26 @@ SQL_TEMPLATES = {
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id = ANY($2) FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id = ANY($2)
""", """,
"get_by_id_entity_chunks": """SELECT id, chunk_ids, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_ENTITY_CHUNKS WHERE workspace=$1 AND id=$2
""",
"get_by_id_relation_chunks": """SELECT id, chunk_ids, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_RELATION_CHUNKS WHERE workspace=$1 AND id=$2
""",
"get_by_ids_entity_chunks": """SELECT id, chunk_ids, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_ENTITY_CHUNKS WHERE workspace=$1 AND id = ANY($2)
""",
"get_by_ids_relation_chunks": """SELECT id, chunk_ids, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_RELATION_CHUNKS WHERE workspace=$1 AND id = ANY($2)
""",
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
"upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, doc_name, workspace) "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, doc_name, workspace)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
@ -4965,6 +5038,22 @@ SQL_TEMPLATES = {
count=EXCLUDED.count, count=EXCLUDED.count,
update_time = EXCLUDED.update_time update_time = EXCLUDED.update_time
""", """,
"upsert_entity_chunks": """INSERT INTO LIGHTRAG_ENTITY_CHUNKS (workspace, id, chunk_ids, count,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (workspace,id) DO UPDATE
SET chunk_ids=EXCLUDED.chunk_ids,
count=EXCLUDED.count,
update_time = EXCLUDED.update_time
""",
"upsert_relation_chunks": """INSERT INTO LIGHTRAG_RELATION_CHUNKS (workspace, id, chunk_ids, count,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (workspace,id) DO UPDATE
SET chunk_ids=EXCLUDED.chunk_ids,
count=EXCLUDED.count,
update_time = EXCLUDED.update_time
""",
# SQL for VectorStorage # SQL for VectorStorage
"upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens, "upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, content_vector, file_path, chunk_order_index, full_doc_id, content, content_vector, file_path,

View file

@ -304,51 +304,6 @@ class RedisKVStorage(BaseKVStorage):
logger.error(f"[{self.workspace}] JSON decode error in batch get: {e}") logger.error(f"[{self.workspace}] JSON decode error in batch get: {e}")
return [None] * len(ids) return [None] * len(ids)
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
async with self._get_redis_connection() as redis:
try:
# Get all keys for this namespace
keys = await redis.keys(f"{self.final_namespace}:*")
if not keys:
return {}
# Get all values in batch
pipe = redis.pipeline()
for key in keys:
pipe.get(key)
values = await pipe.execute()
# Build result dictionary
result = {}
for key, value in zip(keys, values):
if value:
# Extract the ID part (after namespace:)
key_id = key.split(":", 1)[1]
try:
data = json.loads(value)
# Ensure time fields are present for all documents
data.setdefault("create_time", 0)
data.setdefault("update_time", 0)
result[key_id] = data
except json.JSONDecodeError as e:
logger.error(
f"[{self.workspace}] JSON decode error for key {key}: {e}"
)
continue
return result
except Exception as e:
logger.error(
f"[{self.workspace}] Error getting all data from Redis: {e}"
)
return {}
async def filter_keys(self, keys: set[str]) -> set[str]: async def filter_keys(self, keys: set[str]) -> set[str]:
async with self._get_redis_connection() as redis: async with self._get_redis_connection() as redis:
pipe = redis.pipeline() pipe = redis.pipeline()
@ -407,8 +362,24 @@ class RedisKVStorage(BaseKVStorage):
# Redis handles persistence automatically # Redis handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
pattern = f"{self.namespace}:{self.workspace}:*"
try:
# Use scan to check if any keys exist
async for key in self.redis.scan_iter(match=pattern, count=1):
return False # Found at least one key
return True # No keys found
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete entries with specified IDs""" """Delete specific records from storage by their IDs"""
if not ids: if not ids:
return return
@ -868,6 +839,23 @@ class RedisDocStatusStorage(DocStatusStorage):
"""Redis handles persistence automatically""" """Redis handles persistence automatically"""
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
pattern = f"{self.final_namespace}:*"
try:
async with self._get_redis_connection() as redis:
# Use scan to check if any keys exist
async for key in redis.scan_iter(match=pattern, count=1):
return False # Found at least one key
return True # No keys found
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
@redis_retry @redis_retry
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""Insert or update document status data""" """Insert or update document status data"""

View file

@ -41,10 +41,12 @@ from lightrag.constants import (
DEFAULT_MAX_PARALLEL_INSERT, DEFAULT_MAX_PARALLEL_INSERT,
DEFAULT_MAX_GRAPH_NODES, DEFAULT_MAX_GRAPH_NODES,
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, DEFAULT_MAX_SOURCE_IDS_PER_ENTITY,
DEFAULT_MAX_SOURCE_IDS_PER_RELATION,
DEFAULT_ENTITY_TYPES, DEFAULT_ENTITY_TYPES,
DEFAULT_SUMMARY_LANGUAGE, DEFAULT_SUMMARY_LANGUAGE,
DEFAULT_LLM_TIMEOUT, DEFAULT_LLM_TIMEOUT,
DEFAULT_EMBEDDING_TIMEOUT, DEFAULT_EMBEDDING_TIMEOUT,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
) )
from lightrag.utils import get_env_value from lightrag.utils import get_env_value
@ -99,6 +101,9 @@ from lightrag.utils import (
generate_track_id, generate_track_id,
convert_to_user_format, convert_to_user_format,
logger, logger,
subtract_source_ids,
make_relation_chunk_key,
normalize_source_ids_limit_method,
) )
from lightrag.types import KnowledgeGraph from lightrag.types import KnowledgeGraph
from dotenv import load_dotenv from dotenv import load_dotenv
@ -362,10 +367,32 @@ class LightRAG:
"""Maximum number of graph nodes to return in knowledge graph queries.""" """Maximum number of graph nodes to return in knowledge graph queries."""
max_source_ids_per_entity: int = field( max_source_ids_per_entity: int = field(
default=get_env_value("MAX_SOURCE_IDS_PER_ENTITY", DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, int) default=get_env_value(
"MAX_SOURCE_IDS_PER_ENTITY", DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, int
)
) )
"""Maximum number of source (chunk) ids in entity Grpah + VDB.""" """Maximum number of source (chunk) ids in entity Grpah + VDB."""
max_source_ids_per_relation: int = field(
default=get_env_value(
"MAX_SOURCE_IDS_PER_RELATION",
DEFAULT_MAX_SOURCE_IDS_PER_RELATION,
int,
)
)
"""Maximum number of source (chunk) ids in relation Graph + VDB."""
source_ids_limit_method: str = field(
default_factory=lambda: normalize_source_ids_limit_method(
get_env_value(
"SOURCE_IDS_LIMIT_METHOD",
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
str,
)
)
)
"""Strategy for enforcing source_id limits: IGNORE_NEW or FIFO."""
addon_params: dict[str, Any] = field( addon_params: dict[str, Any] = field(
default_factory=lambda: { default_factory=lambda: {
"language": get_env_value( "language": get_env_value(
@ -535,6 +562,18 @@ class LightRAG:
embedding_func=self.embedding_func, embedding_func=self.embedding_func,
) )
self.entity_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=NameSpace.KV_STORE_ENTITY_CHUNKS,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.relation_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=NameSpace.KV_STORE_RELATION_CHUNKS,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore
namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION, namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
workspace=self.workspace, workspace=self.workspace,
@ -594,6 +633,8 @@ class LightRAG:
self.text_chunks, self.text_chunks,
self.full_entities, self.full_entities,
self.full_relations, self.full_relations,
self.entity_chunks,
self.relation_chunks,
self.entities_vdb, self.entities_vdb,
self.relationships_vdb, self.relationships_vdb,
self.chunks_vdb, self.chunks_vdb,
@ -616,6 +657,8 @@ class LightRAG:
("text_chunks", self.text_chunks), ("text_chunks", self.text_chunks),
("full_entities", self.full_entities), ("full_entities", self.full_entities),
("full_relations", self.full_relations), ("full_relations", self.full_relations),
("entity_chunks", self.entity_chunks),
("relation_chunks", self.relation_chunks),
("entities_vdb", self.entities_vdb), ("entities_vdb", self.entities_vdb),
("relationships_vdb", self.relationships_vdb), ("relationships_vdb", self.relationships_vdb),
("chunks_vdb", self.chunks_vdb), ("chunks_vdb", self.chunks_vdb),
@ -671,6 +714,13 @@ class LightRAG:
logger.debug("No entities found in graph, skipping migration check") logger.debug("No entities found in graph, skipping migration check")
return return
try:
# Initialize chunk tracking storage after migration
await self._migrate_chunk_tracking_storage()
except Exception as e:
logger.error(f"Error during chunk_tracking migration: {e}")
raise e
# Check if full_entities and full_relations are empty # Check if full_entities and full_relations are empty
# Get all processed documents to check their entity/relation data # Get all processed documents to check their entity/relation data
try: try:
@ -711,11 +761,11 @@ class LightRAG:
except Exception as e: except Exception as e:
logger.error(f"Error during migration check: {e}") logger.error(f"Error during migration check: {e}")
# Don't raise the error, just log it to avoid breaking initialization raise e
except Exception as e: except Exception as e:
logger.error(f"Error in data migration check: {e}") logger.error(f"Error in data migration check: {e}")
# Don't raise the error to avoid breaking initialization raise e
async def _migrate_entity_relation_data(self, processed_docs: dict): async def _migrate_entity_relation_data(self, processed_docs: dict):
"""Migrate existing entity and relation data to full_entities and full_relations storage""" """Migrate existing entity and relation data to full_entities and full_relations storage"""
@ -814,6 +864,140 @@ class LightRAG:
f"Data migration completed: migrated {migration_count} documents with entities/relations" f"Data migration completed: migrated {migration_count} documents with entities/relations"
) )
async def _migrate_chunk_tracking_storage(self) -> None:
"""Ensure entity/relation chunk tracking KV stores exist and are seeded."""
if not self.entity_chunks or not self.relation_chunks:
return
need_entity_migration = False
need_relation_migration = False
try:
need_entity_migration = await self.entity_chunks.is_empty()
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to check entity chunks storage: {exc}")
need_entity_migration = True
try:
need_relation_migration = await self.relation_chunks.is_empty()
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to check relation chunks storage: {exc}")
need_relation_migration = True
if not need_entity_migration and not need_relation_migration:
return
BATCH_SIZE = 500 # Process 500 records per batch
if need_entity_migration:
try:
nodes = await self.chunk_entity_relation_graph.get_all_nodes()
except Exception as exc:
logger.error(f"Failed to fetch nodes for chunk migration: {exc}")
nodes = []
logger.info(f"Starting chunk_tracking data migration: {len(nodes)} nodes")
# Process nodes in batches
total_nodes = len(nodes)
total_batches = (total_nodes + BATCH_SIZE - 1) // BATCH_SIZE
total_migrated = 0
for batch_idx in range(total_batches):
start_idx = batch_idx * BATCH_SIZE
end_idx = min((batch_idx + 1) * BATCH_SIZE, total_nodes)
batch_nodes = nodes[start_idx:end_idx]
upsert_payload: dict[str, dict[str, object]] = {}
for node in batch_nodes:
entity_id = node.get("entity_id") or node.get("id")
if not entity_id:
continue
raw_source = node.get("source_id") or ""
chunk_ids = [
chunk_id
for chunk_id in raw_source.split(GRAPH_FIELD_SEP)
if chunk_id
]
if not chunk_ids:
continue
upsert_payload[entity_id] = {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
if upsert_payload:
await self.entity_chunks.upsert(upsert_payload)
total_migrated += len(upsert_payload)
logger.info(
f"Processed entity batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_nodes})"
)
if total_migrated > 0:
# Persist entity_chunks data to disk
await self.entity_chunks.index_done_callback()
logger.info(
f"Entity chunk_tracking migration completed: {total_migrated} records persisted"
)
if need_relation_migration:
try:
edges = await self.chunk_entity_relation_graph.get_all_edges()
except Exception as exc:
logger.error(f"Failed to fetch edges for chunk migration: {exc}")
edges = []
logger.info(f"Starting chunk_tracking data migration: {len(edges)} edges")
# Process edges in batches
total_edges = len(edges)
total_batches = (total_edges + BATCH_SIZE - 1) // BATCH_SIZE
total_migrated = 0
for batch_idx in range(total_batches):
start_idx = batch_idx * BATCH_SIZE
end_idx = min((batch_idx + 1) * BATCH_SIZE, total_edges)
batch_edges = edges[start_idx:end_idx]
upsert_payload: dict[str, dict[str, object]] = {}
for edge in batch_edges:
src = edge.get("source") or edge.get("src_id") or edge.get("src")
tgt = edge.get("target") or edge.get("tgt_id") or edge.get("tgt")
if not src or not tgt:
continue
raw_source = edge.get("source_id") or ""
chunk_ids = [
chunk_id
for chunk_id in raw_source.split(GRAPH_FIELD_SEP)
if chunk_id
]
if not chunk_ids:
continue
storage_key = make_relation_chunk_key(src, tgt)
upsert_payload[storage_key] = {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
if upsert_payload:
await self.relation_chunks.upsert(upsert_payload)
total_migrated += len(upsert_payload)
logger.info(
f"Processed relation batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_edges})"
)
if total_migrated > 0:
# Persist relation_chunks data to disk
await self.relation_chunks.index_done_callback()
logger.info(
f"Relation chunk_tracking migration completed: {total_migrated} records persisted"
)
async def get_graph_labels(self): async def get_graph_labels(self):
text = await self.chunk_entity_relation_graph.get_all_labels() text = await self.chunk_entity_relation_graph.get_all_labels()
return text return text
@ -1676,6 +1860,8 @@ class LightRAG:
pipeline_status=pipeline_status, pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock, pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.llm_response_cache, llm_response_cache=self.llm_response_cache,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
current_file_number=current_file_number, current_file_number=current_file_number,
total_files=total_files, total_files=total_files,
file_path=file_path, file_path=file_path,
@ -1845,6 +2031,8 @@ class LightRAG:
self.text_chunks, self.text_chunks,
self.full_entities, self.full_entities,
self.full_relations, self.full_relations,
self.entity_chunks,
self.relation_chunks,
self.llm_response_cache, self.llm_response_cache,
self.entities_vdb, self.entities_vdb,
self.relationships_vdb, self.relationships_vdb,
@ -2718,9 +2906,11 @@ class LightRAG:
# 4. Analyze entities and relationships that will be affected # 4. Analyze entities and relationships that will be affected
entities_to_delete = set() entities_to_delete = set()
entities_to_rebuild = {} # entity_name -> remaining_chunk_ids entities_to_rebuild = {} # entity_name -> remaining chunk id list
relationships_to_delete = set() relationships_to_delete = set()
relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids relationships_to_rebuild = {} # (src, tgt) -> remaining chunk id list
entity_chunk_updates: dict[str, list[str]] = {}
relation_chunk_updates: dict[tuple[str, str], list[str]] = {}
try: try:
# Get affected entities and relations from full_entities and full_relations storage # Get affected entities and relations from full_entities and full_relations storage
@ -2776,14 +2966,41 @@ class LightRAG:
# Process entities # Process entities
for node_data in affected_nodes: for node_data in affected_nodes:
node_label = node_data.get("entity_id") node_label = node_data.get("entity_id")
if node_label and "source_id" in node_data: if not node_label:
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) continue
remaining_sources = sources - chunk_ids
if not remaining_sources: existing_sources: list[str] = []
entities_to_delete.add(node_label) if self.entity_chunks:
elif remaining_sources != sources: stored_chunks = await self.entity_chunks.get_by_id(node_label)
entities_to_rebuild[node_label] = remaining_sources if stored_chunks and isinstance(stored_chunks, dict):
existing_sources = [
chunk_id
for chunk_id in stored_chunks.get("chunk_ids", [])
if chunk_id
]
if not existing_sources and node_data.get("source_id"):
existing_sources = [
chunk_id
for chunk_id in node_data["source_id"].split(
GRAPH_FIELD_SEP
)
if chunk_id
]
if not existing_sources:
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
if not remaining_sources:
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
elif remaining_sources != existing_sources:
entities_to_rebuild[node_label] = remaining_sources
entity_chunk_updates[node_label] = remaining_sources
else:
logger.info(f"Untouch entity: {node_label}")
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Found {len(entities_to_rebuild)} affected entities" log_message = f"Found {len(entities_to_rebuild)} affected entities"
@ -2796,21 +3013,51 @@ class LightRAG:
src = edge_data.get("source") src = edge_data.get("source")
tgt = edge_data.get("target") tgt = edge_data.get("target")
if src and tgt and "source_id" in edge_data: if not src or not tgt or "source_id" not in edge_data:
edge_tuple = tuple(sorted((src, tgt))) continue
if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
):
continue
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) edge_tuple = tuple(sorted((src, tgt)))
remaining_sources = sources - chunk_ids if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
):
continue
if not remaining_sources: existing_sources: list[str] = []
relationships_to_delete.add(edge_tuple) if self.relation_chunks:
elif remaining_sources != sources: storage_key = make_relation_chunk_key(src, tgt)
relationships_to_rebuild[edge_tuple] = remaining_sources stored_chunks = await self.relation_chunks.get_by_id(
storage_key
)
if stored_chunks and isinstance(stored_chunks, dict):
existing_sources = [
chunk_id
for chunk_id in stored_chunks.get("chunk_ids", [])
if chunk_id
]
if not existing_sources:
existing_sources = [
chunk_id
for chunk_id in edge_data["source_id"].split(
GRAPH_FIELD_SEP
)
if chunk_id
]
if not existing_sources:
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
if not remaining_sources:
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
elif remaining_sources != existing_sources:
relationships_to_rebuild[edge_tuple] = remaining_sources
relation_chunk_updates[edge_tuple] = remaining_sources
else:
logger.info(f"Untouch relation: {edge_tuple}")
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = ( log_message = (
@ -2820,6 +3067,45 @@ class LightRAG:
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
current_time = int(time.time())
if entity_chunk_updates and self.entity_chunks:
entity_upsert_payload = {}
entity_delete_ids: set[str] = set()
for entity_name, remaining in entity_chunk_updates.items():
if not remaining:
entity_delete_ids.add(entity_name)
else:
entity_upsert_payload[entity_name] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if entity_delete_ids:
await self.entity_chunks.delete(list(entity_delete_ids))
if entity_upsert_payload:
await self.entity_chunks.upsert(entity_upsert_payload)
if relation_chunk_updates and self.relation_chunks:
relation_upsert_payload = {}
relation_delete_ids: set[str] = set()
for edge_tuple, remaining in relation_chunk_updates.items():
storage_key = make_relation_chunk_key(*edge_tuple)
if not remaining:
relation_delete_ids.add(storage_key)
else:
relation_upsert_payload[storage_key] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if relation_delete_ids:
await self.relation_chunks.delete(list(relation_delete_ids))
if relation_upsert_payload:
await self.relation_chunks.upsert(relation_upsert_payload)
except Exception as e: except Exception as e:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
@ -2914,6 +3200,8 @@ class LightRAG:
global_config=asdict(self), global_config=asdict(self),
pipeline_status=pipeline_status, pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock, pipeline_status_lock=pipeline_status_lock,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
) )
except Exception as e: except Exception as e:

View file

@ -10,6 +10,8 @@ class NameSpace:
KV_STORE_LLM_RESPONSE_CACHE = "llm_response_cache" KV_STORE_LLM_RESPONSE_CACHE = "llm_response_cache"
KV_STORE_FULL_ENTITIES = "full_entities" KV_STORE_FULL_ENTITIES = "full_entities"
KV_STORE_FULL_RELATIONS = "full_relations" KV_STORE_FULL_RELATIONS = "full_relations"
KV_STORE_ENTITY_CHUNKS = "entity_chunks"
KV_STORE_RELATION_CHUNKS = "relation_chunks"
VECTOR_STORE_ENTITIES = "entities" VECTOR_STORE_ENTITIES = "entities"
VECTOR_STORE_RELATIONSHIPS = "relationships" VECTOR_STORE_RELATIONSHIPS = "relationships"

View file

@ -7,7 +7,7 @@ import json_repair
from typing import Any, AsyncIterator, overload, Literal from typing import Any, AsyncIterator, overload, Literal
from collections import Counter, defaultdict from collections import Counter, defaultdict
from .utils import ( from lightrag.utils import (
logger, logger,
compute_mdhash_id, compute_mdhash_id,
Tokenizer, Tokenizer,
@ -27,14 +27,16 @@ from .utils import (
pick_by_vector_similarity, pick_by_vector_similarity,
process_chunks_unified, process_chunks_unified,
build_file_path, build_file_path,
truncate_entity_source_id,
safe_vdb_operation_with_exception, safe_vdb_operation_with_exception,
create_prefixed_exception, create_prefixed_exception,
fix_tuple_delimiter_corruption, fix_tuple_delimiter_corruption,
convert_to_user_format, convert_to_user_format,
generate_reference_list_from_chunks, generate_reference_list_from_chunks,
apply_source_ids_limit,
merge_source_ids,
make_relation_chunk_key,
) )
from .base import ( from lightrag.base import (
BaseGraphStorage, BaseGraphStorage,
BaseKVStorage, BaseKVStorage,
BaseVectorStorage, BaseVectorStorage,
@ -43,8 +45,8 @@ from .base import (
QueryResult, QueryResult,
QueryContextResult, QueryContextResult,
) )
from .prompt import PROMPTS from lightrag.prompt import PROMPTS
from .constants import ( from lightrag.constants import (
GRAPH_FIELD_SEP, GRAPH_FIELD_SEP,
DEFAULT_MAX_ENTITY_TOKENS, DEFAULT_MAX_ENTITY_TOKENS,
DEFAULT_MAX_RELATION_TOKENS, DEFAULT_MAX_RELATION_TOKENS,
@ -53,8 +55,9 @@ from .constants import (
DEFAULT_KG_CHUNK_PICK_METHOD, DEFAULT_KG_CHUNK_PICK_METHOD,
DEFAULT_ENTITY_TYPES, DEFAULT_ENTITY_TYPES,
DEFAULT_SUMMARY_LANGUAGE, DEFAULT_SUMMARY_LANGUAGE,
SOURCE_IDS_LIMIT_METHOD_KEEP,
) )
from .kg.shared_storage import get_storage_keyed_lock from lightrag.kg.shared_storage import get_storage_keyed_lock
import time import time
from dotenv import load_dotenv from dotenv import load_dotenv
@ -474,8 +477,8 @@ async def _handle_single_relationship_extraction(
async def _rebuild_knowledge_from_chunks( async def _rebuild_knowledge_from_chunks(
entities_to_rebuild: dict[str, set[str]], entities_to_rebuild: dict[str, list[str]],
relationships_to_rebuild: dict[tuple[str, str], set[str]], relationships_to_rebuild: dict[tuple[str, str], list[str]],
knowledge_graph_inst: BaseGraphStorage, knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage, entities_vdb: BaseVectorStorage,
relationships_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage,
@ -484,6 +487,8 @@ async def _rebuild_knowledge_from_chunks(
global_config: dict[str, str], global_config: dict[str, str],
pipeline_status: dict | None = None, pipeline_status: dict | None = None,
pipeline_status_lock=None, pipeline_status_lock=None,
entity_chunks_storage: BaseKVStorage | None = None,
relation_chunks_storage: BaseKVStorage | None = None,
) -> None: ) -> None:
"""Rebuild entity and relationship descriptions from cached extraction results with parallel processing """Rebuild entity and relationship descriptions from cached extraction results with parallel processing
@ -492,8 +497,8 @@ async def _rebuild_knowledge_from_chunks(
controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency. controlled by llm_model_max_async and using get_storage_keyed_lock for data consistency.
Args: Args:
entities_to_rebuild: Dict mapping entity_name -> set of remaining chunk_ids entities_to_rebuild: Dict mapping entity_name -> list of remaining chunk_ids
relationships_to_rebuild: Dict mapping (src, tgt) -> set of remaining chunk_ids relationships_to_rebuild: Dict mapping (src, tgt) -> list of remaining chunk_ids
knowledge_graph_inst: Knowledge graph storage knowledge_graph_inst: Knowledge graph storage
entities_vdb: Entity vector database entities_vdb: Entity vector database
relationships_vdb: Relationship vector database relationships_vdb: Relationship vector database
@ -502,6 +507,8 @@ async def _rebuild_knowledge_from_chunks(
global_config: Global configuration containing llm_model_max_async global_config: Global configuration containing llm_model_max_async
pipeline_status: Pipeline status dictionary pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status pipeline_status_lock: Lock for pipeline status
entity_chunks_storage: KV storage maintaining full chunk IDs per entity
relation_chunks_storage: KV storage maintaining full chunk IDs per relation
""" """
if not entities_to_rebuild and not relationships_to_rebuild: if not entities_to_rebuild and not relationships_to_rebuild:
return return
@ -641,10 +648,11 @@ async def _rebuild_knowledge_from_chunks(
chunk_entities=chunk_entities, chunk_entities=chunk_entities,
llm_response_cache=llm_response_cache, llm_response_cache=llm_response_cache,
global_config=global_config, global_config=global_config,
entity_chunks_storage=entity_chunks_storage,
) )
rebuilt_entities_count += 1 rebuilt_entities_count += 1
status_message = ( status_message = (
f"Rebuilt `{entity_name}` from {len(chunk_ids)} chunks" f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks"
) )
logger.info(status_message) logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None: if pipeline_status is not None and pipeline_status_lock is not None:
@ -682,16 +690,11 @@ async def _rebuild_knowledge_from_chunks(
chunk_relationships=chunk_relationships, chunk_relationships=chunk_relationships,
llm_response_cache=llm_response_cache, llm_response_cache=llm_response_cache,
global_config=global_config, global_config=global_config,
relation_chunks_storage=relation_chunks_storage,
pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock,
) )
rebuilt_relationships_count += 1 rebuilt_relationships_count += 1
status_message = (
f"Rebuilt `{src} - {tgt}` from {len(chunk_ids)} chunks"
)
logger.info(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
except Exception as e: except Exception as e:
failed_relationships_count += 1 failed_relationships_count += 1
status_message = f"Failed to rebuild `{src} - {tgt}`: {e}" status_message = f"Failed to rebuild `{src} - {tgt}`: {e}"
@ -1002,10 +1005,13 @@ async def _rebuild_single_entity(
knowledge_graph_inst: BaseGraphStorage, knowledge_graph_inst: BaseGraphStorage,
entities_vdb: BaseVectorStorage, entities_vdb: BaseVectorStorage,
entity_name: str, entity_name: str,
chunk_ids: set[str], chunk_ids: list[str],
chunk_entities: dict, chunk_entities: dict,
llm_response_cache: BaseKVStorage, llm_response_cache: BaseKVStorage,
global_config: dict[str, str], global_config: dict[str, str],
entity_chunks_storage: BaseKVStorage | None = None,
pipeline_status: dict | None = None,
pipeline_status_lock=None,
) -> None: ) -> None:
"""Rebuild a single entity from cached extraction results""" """Rebuild a single entity from cached extraction results"""
@ -1016,7 +1022,11 @@ async def _rebuild_single_entity(
# Helper function to update entity in both graph and vector storage # Helper function to update entity in both graph and vector storage
async def _update_entity_storage( async def _update_entity_storage(
final_description: str, entity_type: str, file_paths: set[str] final_description: str,
entity_type: str,
file_paths: set[str],
source_chunk_ids: list[str],
truncation_info: str = "",
): ):
try: try:
# Update entity in graph storage (critical path) # Update entity in graph storage (critical path)
@ -1024,10 +1034,12 @@ async def _rebuild_single_entity(
**current_entity, **current_entity,
"description": final_description, "description": final_description,
"entity_type": entity_type, "entity_type": entity_type,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids), "source_id": GRAPH_FIELD_SEP.join(source_chunk_ids),
"file_path": GRAPH_FIELD_SEP.join(file_paths) "file_path": GRAPH_FIELD_SEP.join(file_paths)
if file_paths if file_paths
else current_entity.get("file_path", "unknown_source"), else current_entity.get("file_path", "unknown_source"),
"created_at": int(time.time()),
"truncate": truncation_info,
} }
await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data) await knowledge_graph_inst.upsert_node(entity_name, updated_entity_data)
@ -1060,9 +1072,33 @@ async def _rebuild_single_entity(
logger.error(error_msg) logger.error(error_msg)
raise # Re-raise exception raise # Re-raise exception
# Collect all entity data from relevant chunks # normalized_chunk_ids = merge_source_ids([], chunk_ids)
normalized_chunk_ids = chunk_ids
if entity_chunks_storage is not None and normalized_chunk_ids:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": normalized_chunk_ids,
"count": len(normalized_chunk_ids),
}
}
)
limit_method = (
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
limited_chunk_ids = apply_source_ids_limit(
normalized_chunk_ids,
global_config["max_source_ids_per_entity"],
limit_method,
identifier=f"`{entity_name}`",
)
# Collect all entity data from relevant (limited) chunks
all_entity_data = [] all_entity_data = []
for chunk_id in chunk_ids: for chunk_id in limited_chunk_ids:
if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]: if chunk_id in chunk_entities and entity_name in chunk_entities[chunk_id]:
all_entity_data.extend(chunk_entities[chunk_id][entity_name]) all_entity_data.extend(chunk_entities[chunk_id][entity_name])
@ -1109,7 +1145,12 @@ async def _rebuild_single_entity(
final_description = current_entity.get("description", "") final_description = current_entity.get("description", "")
entity_type = current_entity.get("entity_type", "UNKNOWN") entity_type = current_entity.get("entity_type", "UNKNOWN")
await _update_entity_storage(final_description, entity_type, file_paths) await _update_entity_storage(
final_description,
entity_type,
file_paths,
limited_chunk_ids,
)
return return
# Process cached entity data # Process cached entity data
@ -1149,7 +1190,31 @@ async def _rebuild_single_entity(
else: else:
final_description = current_entity.get("description", "") final_description = current_entity.get("description", "")
await _update_entity_storage(final_description, entity_type, file_paths) if len(limited_chunk_ids) < len(normalized_chunk_ids):
truncation_info = (
f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}"
)
else:
truncation_info = ""
await _update_entity_storage(
final_description,
entity_type,
file_paths,
limited_chunk_ids,
truncation_info,
)
# Log rebuild completion with truncation info
status_message = f"Rebuild `{entity_name}` from {len(chunk_ids)} chunks"
if truncation_info:
status_message += f" ({truncation_info})"
logger.info(status_message)
# Update pipeline status
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _rebuild_single_relationship( async def _rebuild_single_relationship(
@ -1157,10 +1222,13 @@ async def _rebuild_single_relationship(
relationships_vdb: BaseVectorStorage, relationships_vdb: BaseVectorStorage,
src: str, src: str,
tgt: str, tgt: str,
chunk_ids: set[str], chunk_ids: list[str],
chunk_relationships: dict, chunk_relationships: dict,
llm_response_cache: BaseKVStorage, llm_response_cache: BaseKVStorage,
global_config: dict[str, str], global_config: dict[str, str],
relation_chunks_storage: BaseKVStorage | None = None,
pipeline_status: dict | None = None,
pipeline_status_lock=None,
) -> None: ) -> None:
"""Rebuild a single relationship from cached extraction results """Rebuild a single relationship from cached extraction results
@ -1173,9 +1241,33 @@ async def _rebuild_single_relationship(
if not current_relationship: if not current_relationship:
return return
# normalized_chunk_ids = merge_source_ids([], chunk_ids)
normalized_chunk_ids = chunk_ids
if relation_chunks_storage is not None and normalized_chunk_ids:
storage_key = make_relation_chunk_key(src, tgt)
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": normalized_chunk_ids,
"count": len(normalized_chunk_ids),
}
}
)
limit_method = (
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
limited_chunk_ids = apply_source_ids_limit(
normalized_chunk_ids,
global_config["max_source_ids_per_relation"],
limit_method,
identifier=f"`{src}`~`{tgt}`",
)
# Collect all relationship data from relevant chunks # Collect all relationship data from relevant chunks
all_relationship_data = [] all_relationship_data = []
for chunk_id in chunk_ids: for chunk_id in limited_chunk_ids:
if chunk_id in chunk_relationships: if chunk_id in chunk_relationships:
# Check both (src, tgt) and (tgt, src) since relationships can be bidirectional # Check both (src, tgt) and (tgt, src) since relationships can be bidirectional
for edge_key in [(src, tgt), (tgt, src)]: for edge_key in [(src, tgt), (tgt, src)]:
@ -1230,6 +1322,13 @@ async def _rebuild_single_relationship(
# fallback to keep current(unchanged) # fallback to keep current(unchanged)
final_description = current_relationship.get("description", "") final_description = current_relationship.get("description", "")
if len(limited_chunk_ids) < len(normalized_chunk_ids):
truncation_info = (
f"{limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)}"
)
else:
truncation_info = ""
# Update relationship in graph storage # Update relationship in graph storage
updated_relationship_data = { updated_relationship_data = {
**current_relationship, **current_relationship,
@ -1238,10 +1337,11 @@ async def _rebuild_single_relationship(
else current_relationship.get("description", ""), else current_relationship.get("description", ""),
"keywords": combined_keywords, "keywords": combined_keywords,
"weight": weight, "weight": weight,
"source_id": GRAPH_FIELD_SEP.join(chunk_ids), "source_id": GRAPH_FIELD_SEP.join(limited_chunk_ids),
"file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp]) "file_path": GRAPH_FIELD_SEP.join([fp for fp in file_paths if fp])
if file_paths if file_paths
else current_relationship.get("file_path", "unknown_source"), else current_relationship.get("file_path", "unknown_source"),
"truncate": truncation_info,
} }
await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data) await knowledge_graph_inst.upsert_edge(src, tgt, updated_relationship_data)
@ -1287,6 +1387,25 @@ async def _rebuild_single_relationship(
logger.error(error_msg) logger.error(error_msg)
raise # Re-raise exception raise # Re-raise exception
# Log rebuild completion with truncation info
status_message = f"Rebuild `{src} - {tgt}` from {len(chunk_ids)} chunks"
if truncation_info:
status_message += f" ({truncation_info})"
# Add truncation info from apply_source_ids_limit if truncation occurred
if len(limited_chunk_ids) < len(normalized_chunk_ids):
truncation_info = (
f" ({limit_method}:{len(limited_chunk_ids)}/{len(normalized_chunk_ids)})"
)
status_message += truncation_info
logger.info(status_message)
# Update pipeline status
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
async def _merge_nodes_then_upsert( async def _merge_nodes_then_upsert(
entity_name: str, entity_name: str,
@ -1296,6 +1415,7 @@ async def _merge_nodes_then_upsert(
pipeline_status: dict = None, pipeline_status: dict = None,
pipeline_status_lock=None, pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
): ):
"""Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert.""" """Get existing nodes from knowledge graph use name,if exists, merge data, else create, then upsert."""
already_entity_types = [] already_entity_types = []
@ -1318,10 +1438,78 @@ async def _merge_nodes_then_upsert(
reverse=True, reverse=True,
)[0][0] # Get the entity type with the highest count )[0][0] # Get the entity type with the highest count
original_nodes_count = len(nodes_data)
new_source_ids = [dp["source_id"] for dp in nodes_data if dp.get("source_id")]
existing_full_source_ids = []
if entity_chunks_storage is not None:
stored_chunks = await entity_chunks_storage.get_by_id(entity_name)
if stored_chunks and isinstance(stored_chunks, dict):
existing_full_source_ids = [
chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id
]
if not existing_full_source_ids:
existing_full_source_ids = [
chunk_id for chunk_id in already_source_ids if chunk_id
]
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
if entity_chunks_storage is not None and full_source_ids:
await entity_chunks_storage.upsert(
{
entity_name: {
"chunk_ids": full_source_ids,
"count": len(full_source_ids),
}
}
)
limit_method = (
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
source_ids = apply_source_ids_limit(
full_source_ids,
global_config["max_source_ids_per_entity"],
limit_method,
identifier=f"`{entity_name}`",
)
# Only apply filtering in IGNORE_NEW mode
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
allowed_source_ids = set(source_ids)
filtered_nodes = []
for dp in nodes_data:
source_id = dp.get("source_id")
# Skip descriptions sourced from chunks dropped by the IGNORE_NEW cap
if (
source_id
and source_id not in allowed_source_ids
and source_id not in existing_full_source_ids
):
continue
filtered_nodes.append(dp)
nodes_data = filtered_nodes
else:
# In FIFO mode, keep all node descriptions - truncation happens at source_ids level only
nodes_data = list(nodes_data)
max_source_limit = global_config["max_source_ids_per_entity"]
skip_summary_due_to_limit = (
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
and len(existing_full_source_ids) >= max_source_limit
and not nodes_data
and already_description
)
# Deduplicate by description, keeping first occurrence # Deduplicate by description, keeping first occurrence
unique_nodes = {} unique_nodes = {}
for dp in nodes_data: for dp in nodes_data:
desc = dp["description"] desc = dp.get("description")
if not desc:
continue
if desc not in unique_nodes: if desc not in unique_nodes:
unique_nodes[desc] = dp unique_nodes[desc] = dp
@ -1332,17 +1520,31 @@ async def _merge_nodes_then_upsert(
) )
sorted_descriptions = [dp["description"] for dp in sorted_nodes] sorted_descriptions = [dp["description"] for dp in sorted_nodes]
truncation_info = ""
dd_message = ""
# Combine already_description with sorted new sorted descriptions # Combine already_description with sorted new sorted descriptions
description_list = already_description + sorted_descriptions description_list = already_description + sorted_descriptions
deduplicated_num = original_nodes_count - len(sorted_descriptions)
if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}"
num_fragment = len(description_list) num_fragment = len(description_list)
already_fragment = len(already_description) already_fragment = len(already_description)
deduplicated_num = already_fragment + len(nodes_data) - num_fragment if skip_summary_due_to_limit:
if deduplicated_num > 0: description = (
dd_message = f"(dd:{deduplicated_num})" already_node.get("description", "(no description)")
else: if already_node
dd_message = "" else "(no description)"
if num_fragment > 0: )
llm_was_used = False
status_message = f"Skip merge for `{entity_name}`: IGNORE_NEW limit reached"
logger.debug(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
elif num_fragment > 0:
# Get summary and LLM usage status # Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary( description, llm_was_used = await _handle_entity_relation_summary(
"Entity", "Entity",
@ -1355,9 +1557,16 @@ async def _merge_nodes_then_upsert(
# Log based on actual LLM usage # Log based on actual LLM usage
if llm_was_used: if llm_was_used:
status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" status_message = f"LLMmrg: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
else: else:
status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" status_message = f"Merged: `{entity_name}` | {already_fragment}+{num_fragment - already_fragment}"
# Add truncation info from apply_source_ids_limit if truncation occurred
if len(source_ids) < len(full_source_ids):
truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}"
if dd_message or truncation_info:
status_message += f"({','.join([truncation_info, dd_message])})"
if already_fragment > 0 or llm_was_used: if already_fragment > 0 or llm_was_used:
logger.info(status_message) logger.info(status_message)
@ -1372,9 +1581,6 @@ async def _merge_nodes_then_upsert(
logger.error(f"Entity {entity_name} has no description") logger.error(f"Entity {entity_name} has no description")
description = "(no description)" description = "(no description)"
merged_source_ids: set = set([dp["source_id"] for dp in nodes_data] + already_source_ids)
source_ids = truncate_entity_source_id(merged_source_ids, entity_name, global_config)
source_id = GRAPH_FIELD_SEP.join(source_ids) source_id = GRAPH_FIELD_SEP.join(source_ids)
file_path = build_file_path(already_file_paths, nodes_data, entity_name) file_path = build_file_path(already_file_paths, nodes_data, entity_name)
@ -1386,6 +1592,7 @@ async def _merge_nodes_then_upsert(
source_id=source_id, source_id=source_id,
file_path=file_path, file_path=file_path,
created_at=int(time.time()), created_at=int(time.time()),
truncate=truncation_info,
) )
await knowledge_graph_inst.upsert_node( await knowledge_graph_inst.upsert_node(
entity_name, entity_name,
@ -1405,6 +1612,7 @@ async def _merge_edges_then_upsert(
pipeline_status_lock=None, pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
added_entities: list = None, # New parameter to track entities added during edge processing added_entities: list = None, # New parameter to track entities added during edge processing
relation_chunks_storage: BaseKVStorage | None = None,
): ):
if src_id == tgt_id: if src_id == tgt_id:
return None return None
@ -1448,16 +1656,84 @@ async def _merge_edges_then_upsert(
) )
) )
original_edges_count = len(edges_data)
new_source_ids = [dp["source_id"] for dp in edges_data if dp.get("source_id")]
storage_key = make_relation_chunk_key(src_id, tgt_id)
existing_full_source_ids = []
if relation_chunks_storage is not None:
stored_chunks = await relation_chunks_storage.get_by_id(storage_key)
if stored_chunks and isinstance(stored_chunks, dict):
existing_full_source_ids = [
chunk_id for chunk_id in stored_chunks.get("chunk_ids", []) if chunk_id
]
if not existing_full_source_ids:
existing_full_source_ids = [
chunk_id for chunk_id in already_source_ids if chunk_id
]
full_source_ids = merge_source_ids(existing_full_source_ids, new_source_ids)
if relation_chunks_storage is not None and full_source_ids:
await relation_chunks_storage.upsert(
{
storage_key: {
"chunk_ids": full_source_ids,
"count": len(full_source_ids),
}
}
)
source_ids = apply_source_ids_limit(
full_source_ids,
global_config["max_source_ids_per_relation"],
global_config.get("source_ids_limit_method"),
identifier=f"`{src_id}`~`{tgt_id}`",
)
limit_method = (
global_config.get("source_ids_limit_method") or SOURCE_IDS_LIMIT_METHOD_KEEP
)
# Only apply filtering in IGNORE_NEW mode
if limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP:
allowed_source_ids = set(source_ids)
filtered_edges = []
for dp in edges_data:
source_id = dp.get("source_id")
# Skip relationship fragments sourced from chunks dropped by the IGNORE_NEW cap
if (
source_id
and source_id not in allowed_source_ids
and source_id not in existing_full_source_ids
):
continue
filtered_edges.append(dp)
edges_data = filtered_edges
else:
# In FIFO mode, keep all edge descriptions - truncation happens at source_ids level only
edges_data = list(edges_data)
max_source_limit = global_config["max_source_ids_per_relation"]
skip_summary_due_to_limit = (
limit_method == SOURCE_IDS_LIMIT_METHOD_KEEP
and len(existing_full_source_ids) >= max_source_limit
and not edges_data
and already_description
)
# Process edges_data with None checks # Process edges_data with None checks
weight = sum([dp["weight"] for dp in edges_data] + already_weights) weight = sum([dp["weight"] for dp in edges_data] + already_weights)
# Deduplicate by description, keeping first occurrence # Deduplicate by description, keeping first occurrence
unique_edges = {} unique_edges = {}
for dp in edges_data: for dp in edges_data:
if dp.get("description"): description_value = dp.get("description")
desc = dp["description"] if not description_value:
if desc not in unique_edges: continue
unique_edges[desc] = dp if description_value not in unique_edges:
unique_edges[description_value] = dp
# Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same # Sort description by timestamp, then by description length (largest to smallest) when timestamps are the same
sorted_edges = sorted( sorted_edges = sorted(
@ -1466,17 +1742,34 @@ async def _merge_edges_then_upsert(
) )
sorted_descriptions = [dp["description"] for dp in sorted_edges] sorted_descriptions = [dp["description"] for dp in sorted_edges]
truncation_info = ""
dd_message = ""
# Combine already_description with sorted new descriptions # Combine already_description with sorted new descriptions
description_list = already_description + sorted_descriptions description_list = already_description + sorted_descriptions
deduplicated_num = original_edges_count - len(sorted_descriptions)
if deduplicated_num > 0:
dd_message = f"dd:{deduplicated_num}"
num_fragment = len(description_list) num_fragment = len(description_list)
already_fragment = len(already_description) already_fragment = len(already_description)
deduplicated_num = already_fragment + len(edges_data) - num_fragment
if deduplicated_num > 0: if skip_summary_due_to_limit:
dd_message = f"(dd:{deduplicated_num})" description = (
else: already_edge.get("description", "(no description)")
dd_message = "" if already_edge
if num_fragment > 0: else "(no description)"
)
llm_was_used = False
status_message = (
f"Skip merge for `{src_id}`~`{tgt_id}`: IGNORE_NEW limit reached"
)
logger.debug(status_message)
if pipeline_status is not None and pipeline_status_lock is not None:
async with pipeline_status_lock:
pipeline_status["latest_message"] = status_message
pipeline_status["history_messages"].append(status_message)
elif num_fragment > 0:
# Get summary and LLM usage status # Get summary and LLM usage status
description, llm_was_used = await _handle_entity_relation_summary( description, llm_was_used = await _handle_entity_relation_summary(
"Relation", "Relation",
@ -1489,9 +1782,16 @@ async def _merge_edges_then_upsert(
# Log based on actual LLM usage # Log based on actual LLM usage
if llm_was_used: if llm_was_used:
status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" status_message = f"LLMmrg: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
else: else:
status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}{dd_message}" status_message = f"Merged: `{src_id}`~`{tgt_id}` | {already_fragment}+{num_fragment - already_fragment}"
# Add truncation info from apply_source_ids_limit if truncation occurred
if len(source_ids) < len(full_source_ids):
truncation_info = f"{limit_method}:{len(source_ids)}/{len(full_source_ids)}"
if dd_message or truncation_info:
status_message += f"({','.join([truncation_info, dd_message])})"
if already_fragment > 0 or llm_was_used: if already_fragment > 0 or llm_was_used:
logger.info(status_message) logger.info(status_message)
@ -1521,12 +1821,7 @@ async def _merge_edges_then_upsert(
# Join all unique keywords with commas # Join all unique keywords with commas
keywords = ",".join(sorted(all_keywords)) keywords = ",".join(sorted(all_keywords))
source_id = GRAPH_FIELD_SEP.join( source_id = GRAPH_FIELD_SEP.join(source_ids)
set(
[dp["source_id"] for dp in edges_data if dp.get("source_id")]
+ already_source_ids
)
)
file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}") file_path = build_file_path(already_file_paths, edges_data, f"{src_id}-{tgt_id}")
for need_insert_id in [src_id, tgt_id]: for need_insert_id in [src_id, tgt_id]:
@ -1538,6 +1833,7 @@ async def _merge_edges_then_upsert(
"entity_type": "UNKNOWN", "entity_type": "UNKNOWN",
"file_path": file_path, "file_path": file_path,
"created_at": int(time.time()), "created_at": int(time.time()),
"truncate": "",
} }
await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data) await knowledge_graph_inst.upsert_node(need_insert_id, node_data=node_data)
@ -1563,6 +1859,7 @@ async def _merge_edges_then_upsert(
source_id=source_id, source_id=source_id,
file_path=file_path, file_path=file_path,
created_at=int(time.time()), created_at=int(time.time()),
truncate=truncation_info,
), ),
) )
@ -1574,6 +1871,7 @@ async def _merge_edges_then_upsert(
source_id=source_id, source_id=source_id,
file_path=file_path, file_path=file_path,
created_at=int(time.time()), created_at=int(time.time()),
truncate=truncation_info,
) )
return edge_data return edge_data
@ -1591,6 +1889,8 @@ async def merge_nodes_and_edges(
pipeline_status: dict = None, pipeline_status: dict = None,
pipeline_status_lock=None, pipeline_status_lock=None,
llm_response_cache: BaseKVStorage | None = None, llm_response_cache: BaseKVStorage | None = None,
entity_chunks_storage: BaseKVStorage | None = None,
relation_chunks_storage: BaseKVStorage | None = None,
current_file_number: int = 0, current_file_number: int = 0,
total_files: int = 0, total_files: int = 0,
file_path: str = "unknown_source", file_path: str = "unknown_source",
@ -1614,6 +1914,8 @@ async def merge_nodes_and_edges(
pipeline_status: Pipeline status dictionary pipeline_status: Pipeline status dictionary
pipeline_status_lock: Lock for pipeline status pipeline_status_lock: Lock for pipeline status
llm_response_cache: LLM response cache llm_response_cache: LLM response cache
entity_chunks_storage: Storage tracking full chunk lists per entity
relation_chunks_storage: Storage tracking full chunk lists per relation
current_file_number: Current file number for logging current_file_number: Current file number for logging
total_files: Total files for logging total_files: Total files for logging
file_path: File path for logging file_path: File path for logging
@ -1671,6 +1973,7 @@ async def merge_nodes_and_edges(
pipeline_status, pipeline_status,
pipeline_status_lock, pipeline_status_lock,
llm_response_cache, llm_response_cache,
entity_chunks_storage,
) )
# Vector database operation (equally critical, must succeed) # Vector database operation (equally critical, must succeed)
@ -1689,7 +1992,6 @@ async def merge_nodes_and_edges(
} }
} }
logger.debug(f"Inserting {entity_name} in Graph") logger.debug(f"Inserting {entity_name} in Graph")
# Use safe operation wrapper - VDB failure must throw exception # Use safe operation wrapper - VDB failure must throw exception
await safe_vdb_operation_with_exception( await safe_vdb_operation_with_exception(
@ -1804,6 +2106,7 @@ async def merge_nodes_and_edges(
pipeline_status_lock, pipeline_status_lock,
llm_response_cache, llm_response_cache,
added_entities, # Pass list to collect added entities added_entities, # Pass list to collect added entities
relation_chunks_storage,
) )
if edge_data is None: if edge_data is None:

View file

@ -15,7 +15,17 @@ from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from functools import wraps from functools import wraps
from hashlib import md5 from hashlib import md5
from typing import Any, Protocol, Callable, TYPE_CHECKING, List, Optional from typing import (
Any,
Protocol,
Callable,
TYPE_CHECKING,
List,
Optional,
Iterable,
Sequence,
Collection,
)
import numpy as np import numpy as np
from dotenv import load_dotenv from dotenv import load_dotenv
@ -26,6 +36,9 @@ from lightrag.constants import (
GRAPH_FIELD_SEP, GRAPH_FIELD_SEP,
DEFAULT_MAX_TOTAL_TOKENS, DEFAULT_MAX_TOTAL_TOKENS,
DEFAULT_MAX_FILE_PATH_LENGTH, DEFAULT_MAX_FILE_PATH_LENGTH,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
VALID_SOURCE_IDS_LIMIT_METHODS,
SOURCE_IDS_LIMIT_METHOD_FIFO,
) )
# Initialize logger with basic configuration # Initialize logger with basic configuration
@ -2464,24 +2477,112 @@ async def process_chunks_unified(
return final_chunks return final_chunks
def truncate_entity_source_id(chunk_ids: set, entity_name: str, global_config: dict) -> set:
"""Limit chunk_ids, for entities that appear a HUGE no of times (To not break VDB hard upper limits)"""
already_len: int = len(chunk_ids)
max_chunk_ids_per_entity = global_config["max_source_ids_per_entity"] def normalize_source_ids_limit_method(method: str | None) -> str:
"""Normalize the source ID limiting strategy and fall back to default when invalid."""
if already_len <= max_chunk_ids_per_entity: if not method:
return chunk_ids return DEFAULT_SOURCE_IDS_LIMIT_METHOD
logger.warning( normalized = method.upper()
f"Source Ids already exceeds {max_chunk_ids_per_entity } for {entity_name}, " if normalized not in VALID_SOURCE_IDS_LIMIT_METHODS:
f"current size: {already_len}, truncating..." logger.warning(
) "Unknown SOURCE_IDS_LIMIT_METHOD '%s', falling back to %s",
method,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
)
return DEFAULT_SOURCE_IDS_LIMIT_METHOD
truncated_chunk_ids = set(list(chunk_ids)[0:max_chunk_ids_per_entity ]) return normalized
return truncated_chunk_ids
def merge_source_ids(
existing_ids: Iterable[str] | None, new_ids: Iterable[str] | None
) -> list[str]:
"""Merge two iterables of source IDs while preserving order and removing duplicates."""
merged: list[str] = []
seen: set[str] = set()
for sequence in (existing_ids, new_ids):
if not sequence:
continue
for source_id in sequence:
if not source_id:
continue
if source_id not in seen:
seen.add(source_id)
merged.append(source_id)
return merged
def apply_source_ids_limit(
source_ids: Sequence[str],
limit: int,
method: str,
*,
identifier: str | None = None,
) -> list[str]:
"""Apply a limit strategy to a sequence of source IDs."""
if limit <= 0:
return []
source_ids_list = list(source_ids)
if len(source_ids_list) <= limit:
return source_ids_list
normalized_method = normalize_source_ids_limit_method(method)
if normalized_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
truncated = source_ids_list[-limit:]
else: # IGNORE_NEW
truncated = source_ids_list[:limit]
if identifier and len(truncated) < len(source_ids_list):
logger.debug(
"Source_id truncated: %s | %s keeping %s of %s entries",
identifier,
normalized_method,
len(truncated),
len(source_ids_list),
)
return truncated
def subtract_source_ids(
source_ids: Iterable[str],
ids_to_remove: Collection[str],
) -> list[str]:
"""Remove a collection of IDs from an ordered iterable while preserving order."""
removal_set = set(ids_to_remove)
if not removal_set:
return [source_id for source_id in source_ids if source_id]
return [
source_id
for source_id in source_ids
if source_id and source_id not in removal_set
]
def make_relation_chunk_key(src: str, tgt: str) -> str:
"""Create a deterministic storage key for relation chunk tracking."""
return GRAPH_FIELD_SEP.join(sorted((src, tgt)))
def parse_relation_chunk_key(key: str) -> tuple[str, str]:
"""Parse a relation chunk storage key back into its entity pair."""
parts = key.split(GRAPH_FIELD_SEP)
if len(parts) != 2:
raise ValueError(f"Invalid relation chunk key: {key}")
return parts[0], parts[1]
def build_file_path(already_file_paths, data_list, target): def build_file_path(already_file_paths, data_list, target):
"""Build file path string with UTF-8 byte length limit and deduplication """Build file path string with UTF-8 byte length limit and deduplication