Merge pull request #2240 from danielaskdd/limit-vdb-metadata-size

Refact: Limit Vector Database Metadata Size
This commit is contained in:
Daniel.y 2025-10-21 20:55:51 +08:00 committed by GitHub
commit aee0afdd46
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 1494 additions and 505 deletions

View file

@ -74,11 +74,6 @@ ENABLE_LLM_CACHE=true
### control the maximum tokens send to LLM (include entities, relations and chunks) ### control the maximum tokens send to LLM (include entities, relations and chunks)
# MAX_TOTAL_TOKENS=30000 # MAX_TOTAL_TOKENS=30000
### maximum number of related chunks per source entity or relation
### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph)
### Higher values increase re-ranking time
# RELATED_CHUNK_NUMBER=5
### chunk selection strategies ### chunk selection strategies
### VECTOR: Pick KG chunks by vector similarity, delivered chunks to the LLM aligning more closely with naive retrieval ### VECTOR: Pick KG chunks by vector similarity, delivered chunks to the LLM aligning more closely with naive retrieval
### WEIGHT: Pick KG chunks by entity and chunk weight, delivered more solely KG related chunks to the LLM ### WEIGHT: Pick KG chunks by entity and chunk weight, delivered more solely KG related chunks to the LLM
@ -140,6 +135,22 @@ SUMMARY_LANGUAGE=English
### Maximum context size sent to LLM for description summary ### Maximum context size sent to LLM for description summary
# SUMMARY_CONTEXT_SIZE=12000 # SUMMARY_CONTEXT_SIZE=12000
### control the maximum chunk_ids stored in vector and graph db
# MAX_SOURCE_IDS_PER_ENTITY=300
# MAX_SOURCE_IDS_PER_RELATION=300
### control chunk_ids limitation method: FIFO, FIFO
### FIFO: First in first out
### KEEP: Keep oldest (less merge action and faster)
# SOURCE_IDS_LIMIT_METHOD=FIFO
# Maximum number of file paths stored in entity/relation file_path field (For displayed only, does not affect query performance)
# MAX_FILE_PATHS=100
### maximum number of related chunks per source entity or relation
### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph)
### Higher values increase re-ranking time
# RELATED_CHUNK_NUMBER=5
############################### ###############################
### Concurrency Configuration ### Concurrency Configuration
############################### ###############################

View file

@ -1 +1 @@
__api_version__ = "0241" __api_version__ = "0242"

View file

@ -2003,6 +2003,8 @@ def create_document_routes(
rag.full_docs, rag.full_docs,
rag.full_entities, rag.full_entities,
rag.full_relations, rag.full_relations,
rag.entity_chunks,
rag.relation_chunks,
rag.entities_vdb, rag.entities_vdb,
rag.relationships_vdb, rag.relationships_vdb,
rag.chunks_vdb, rag.chunks_vdb,

View file

@ -355,6 +355,14 @@ class BaseKVStorage(StorageNameSpace, ABC):
None None
""" """
@abstractmethod
async def is_empty(self) -> bool:
"""Check if the storage is empty
Returns:
bool: True if storage contains no data, False otherwise
"""
@dataclass @dataclass
class BaseGraphStorage(StorageNameSpace, ABC): class BaseGraphStorage(StorageNameSpace, ABC):

View file

@ -57,8 +57,27 @@ DEFAULT_HISTORY_TURNS = 0
DEFAULT_MIN_RERANK_SCORE = 0.0 DEFAULT_MIN_RERANK_SCORE = 0.0
DEFAULT_RERANK_BINDING = "null" DEFAULT_RERANK_BINDING = "null"
# File path configuration for vector and graph database(Should not be changed, used in Milvus Schema) # Default source ids limit in meta data for entity and relation
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 300
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 300
### control chunk_ids limitation method: FIFO, FIFO
### FIFO: First in first out
### KEEP: Keep oldest (less merge action and faster)
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP"
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO"
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_FIFO
VALID_SOURCE_IDS_LIMIT_METHODS = {
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
}
# Maximum number of file paths stored in entity/relation file_path field (For displayed only, does not affect query performance)
DEFAULT_MAX_FILE_PATHS = 100
# Field length of file_path in Milvus Schema for entity and relation (Should not be changed)
# file_path must store all file paths up to the DEFAULT_MAX_FILE_PATHS limit within the metadata.
DEFAULT_MAX_FILE_PATH_LENGTH = 32768 DEFAULT_MAX_FILE_PATH_LENGTH = 32768
# Placeholder for more file paths in meta data for entity and relation (Should not be changed)
DEFAULT_FILE_PATH_MORE_PLACEHOLDER = "truncated"
# Default temperature for LLM # Default temperature for LLM
DEFAULT_TEMPERATURE = 1.0 DEFAULT_TEMPERATURE = 1.0

View file

@ -187,6 +187,20 @@ class JsonDocStatusStorage(DocStatusStorage):
await self.index_done_callback() await self.index_done_callback()
async def is_empty(self) -> bool:
"""Check if the storage is empty
Returns:
bool: True if storage is empty, False otherwise
Raises:
StorageNotInitializedError: If storage is not initialized
"""
if self._storage_lock is None:
raise StorageNotInitializedError("JsonDocStatusStorage")
async with self._storage_lock:
return len(self._data) == 0
async def get_by_id(self, id: str) -> Union[dict[str, Any], None]: async def get_by_id(self, id: str) -> Union[dict[str, Any], None]:
async with self._storage_lock: async with self._storage_lock:
return self._data.get(id) return self._data.get(id)

View file

@ -84,26 +84,6 @@ class JsonKVStorage(BaseKVStorage):
write_json(data_dict, self._file_name) write_json(data_dict, self._file_name)
await clear_all_update_flags(self.final_namespace) await clear_all_update_flags(self.final_namespace)
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
async with self._storage_lock:
result = {}
for key, value in self._data.items():
if value:
# Create a copy to avoid modifying the original data
data = dict(value)
# Ensure time fields are present, provide default values for old data
data.setdefault("create_time", 0)
data.setdefault("update_time", 0)
result[key] = data
else:
result[key] = value
return result
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
async with self._storage_lock: async with self._storage_lock:
result = self._data.get(id) result = self._data.get(id)
@ -200,6 +180,15 @@ class JsonKVStorage(BaseKVStorage):
if any_deleted: if any_deleted:
await set_all_update_flags(self.final_namespace) await set_all_update_flags(self.final_namespace)
async def is_empty(self) -> bool:
"""Check if the storage is empty
Returns:
bool: True if storage contains no data, False otherwise
"""
async with self._storage_lock:
return len(self._data) == 0
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop all data from storage and clean up resources """Drop all data from storage and clean up resources
This action will persistent the data to disk immediately. This action will persistent the data to disk immediately.

View file

@ -175,22 +175,6 @@ class MongoKVStorage(BaseKVStorage):
existing_ids = {str(x["_id"]) async for x in cursor} existing_ids = {str(x["_id"]) async for x in cursor}
return keys - existing_ids return keys - existing_ids
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
cursor = self._data.find({})
result = {}
async for doc in cursor:
doc_id = doc.pop("_id")
# Ensure time fields are present for all documents
doc.setdefault("create_time", 0)
doc.setdefault("update_time", 0)
result[doc_id] = doc
return result
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}") logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}")
if not data: if not data:
@ -236,6 +220,20 @@ class MongoKVStorage(BaseKVStorage):
# Mongo handles persistence automatically # Mongo handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
try:
# Use count_documents with limit 1 for efficiency
count = await self._data.count_documents({}, limit=1)
return count == 0
except PyMongoError as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete documents with specified IDs """Delete documents with specified IDs
@ -466,6 +464,20 @@ class MongoDocStatusStorage(DocStatusStorage):
# Mongo handles persistence automatically # Mongo handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
try:
# Use count_documents with limit 1 for efficiency
count = await self._data.count_documents({}, limit=1)
return count == 0
except PyMongoError as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def drop(self) -> dict[str, str]: async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection. """Drop the storage by removing all documents in the collection.

View file

@ -1656,113 +1656,6 @@ class PGKVStorage(BaseKVStorage):
self.db = None self.db = None
################ QUERY METHODS ################ ################ QUERY METHODS ################
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for get_all: {self.namespace}"
)
return {}
sql = f"SELECT * FROM {table_name} WHERE workspace=$1"
params = {"workspace": self.workspace}
try:
results = await self.db.query(sql, list(params.values()), multirows=True)
# Special handling for LLM cache to ensure compatibility with _get_cached_extraction_results
if is_namespace(self.namespace, NameSpace.KV_STORE_LLM_RESPONSE_CACHE):
processed_results = {}
for row in results:
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
# Map field names and add cache_type for compatibility
processed_row = {
**row,
"return": row.get("return_value", ""),
"cache_type": row.get("original_prompt", "unknow"),
"original_prompt": row.get("original_prompt", ""),
"chunk_id": row.get("chunk_id"),
"mode": row.get("mode", "default"),
"create_time": create_time,
"update_time": create_time if update_time == 0 else update_time,
}
processed_results[row["id"]] = processed_row
return processed_results
# For text_chunks namespace, parse llm_cache_list JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_TEXT_CHUNKS):
processed_results = {}
for row in results:
llm_cache_list = row.get("llm_cache_list", [])
if isinstance(llm_cache_list, str):
try:
llm_cache_list = json.loads(llm_cache_list)
except json.JSONDecodeError:
llm_cache_list = []
row["llm_cache_list"] = llm_cache_list
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
row["create_time"] = create_time
row["update_time"] = (
create_time if update_time == 0 else update_time
)
processed_results[row["id"]] = row
return processed_results
# For FULL_ENTITIES namespace, parse entity_names JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_ENTITIES):
processed_results = {}
for row in results:
entity_names = row.get("entity_names", [])
if isinstance(entity_names, str):
try:
entity_names = json.loads(entity_names)
except json.JSONDecodeError:
entity_names = []
row["entity_names"] = entity_names
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
row["create_time"] = create_time
row["update_time"] = (
create_time if update_time == 0 else update_time
)
processed_results[row["id"]] = row
return processed_results
# For FULL_RELATIONS namespace, parse relation_pairs JSON string back to list
if is_namespace(self.namespace, NameSpace.KV_STORE_FULL_RELATIONS):
processed_results = {}
for row in results:
relation_pairs = row.get("relation_pairs", [])
if isinstance(relation_pairs, str):
try:
relation_pairs = json.loads(relation_pairs)
except json.JSONDecodeError:
relation_pairs = []
row["relation_pairs"] = relation_pairs
create_time = row.get("create_time", 0)
update_time = row.get("update_time", 0)
row["create_time"] = create_time
row["update_time"] = (
create_time if update_time == 0 else update_time
)
processed_results[row["id"]] = row
return processed_results
# For other namespaces, return as-is
return {row["id"]: row for row in results}
except Exception as e:
logger.error(
f"[{self.workspace}] Error retrieving all data from {self.namespace}: {e}"
)
return {}
async def get_by_id(self, id: str) -> dict[str, Any] | None: async def get_by_id(self, id: str) -> dict[str, Any] | None:
"""Get data by id.""" """Get data by id."""
sql = SQL_TEMPLATES["get_by_id_" + self.namespace] sql = SQL_TEMPLATES["get_by_id_" + self.namespace]
@ -1838,6 +1731,38 @@ class PGKVStorage(BaseKVStorage):
response["create_time"] = create_time response["create_time"] = create_time
response["update_time"] = create_time if update_time == 0 else update_time response["update_time"] = create_time if update_time == 0 else update_time
# Special handling for ENTITY_CHUNKS namespace
if response and is_namespace(self.namespace, NameSpace.KV_STORE_ENTITY_CHUNKS):
# Parse chunk_ids JSON string back to list
chunk_ids = response.get("chunk_ids", [])
if isinstance(chunk_ids, str):
try:
chunk_ids = json.loads(chunk_ids)
except json.JSONDecodeError:
chunk_ids = []
response["chunk_ids"] = chunk_ids
create_time = response.get("create_time", 0)
update_time = response.get("update_time", 0)
response["create_time"] = create_time
response["update_time"] = create_time if update_time == 0 else update_time
# Special handling for RELATION_CHUNKS namespace
if response and is_namespace(
self.namespace, NameSpace.KV_STORE_RELATION_CHUNKS
):
# Parse chunk_ids JSON string back to list
chunk_ids = response.get("chunk_ids", [])
if isinstance(chunk_ids, str):
try:
chunk_ids = json.loads(chunk_ids)
except json.JSONDecodeError:
chunk_ids = []
response["chunk_ids"] = chunk_ids
create_time = response.get("create_time", 0)
update_time = response.get("update_time", 0)
response["create_time"] = create_time
response["update_time"] = create_time if update_time == 0 else update_time
return response if response else None return response if response else None
# Query by id # Query by id
@ -1946,6 +1871,38 @@ class PGKVStorage(BaseKVStorage):
result["create_time"] = create_time result["create_time"] = create_time
result["update_time"] = create_time if update_time == 0 else update_time result["update_time"] = create_time if update_time == 0 else update_time
# Special handling for ENTITY_CHUNKS namespace
if results and is_namespace(self.namespace, NameSpace.KV_STORE_ENTITY_CHUNKS):
for result in results:
# Parse chunk_ids JSON string back to list
chunk_ids = result.get("chunk_ids", [])
if isinstance(chunk_ids, str):
try:
chunk_ids = json.loads(chunk_ids)
except json.JSONDecodeError:
chunk_ids = []
result["chunk_ids"] = chunk_ids
create_time = result.get("create_time", 0)
update_time = result.get("update_time", 0)
result["create_time"] = create_time
result["update_time"] = create_time if update_time == 0 else update_time
# Special handling for RELATION_CHUNKS namespace
if results and is_namespace(self.namespace, NameSpace.KV_STORE_RELATION_CHUNKS):
for result in results:
# Parse chunk_ids JSON string back to list
chunk_ids = result.get("chunk_ids", [])
if isinstance(chunk_ids, str):
try:
chunk_ids = json.loads(chunk_ids)
except json.JSONDecodeError:
chunk_ids = []
result["chunk_ids"] = chunk_ids
create_time = result.get("create_time", 0)
update_time = result.get("update_time", 0)
result["create_time"] = create_time
result["update_time"] = create_time if update_time == 0 else update_time
return _order_results(results) return _order_results(results)
async def filter_keys(self, keys: set[str]) -> set[str]: async def filter_keys(self, keys: set[str]) -> set[str]:
@ -2050,11 +2007,61 @@ class PGKVStorage(BaseKVStorage):
"update_time": current_time, "update_time": current_time,
} }
await self.db.execute(upsert_sql, _data) await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_ENTITY_CHUNKS):
# Get current UTC time and convert to naive datetime for database storage
current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_entity_chunks"]
_data = {
"workspace": self.workspace,
"id": k,
"chunk_ids": json.dumps(v["chunk_ids"]),
"count": v["count"],
"create_time": current_time,
"update_time": current_time,
}
await self.db.execute(upsert_sql, _data)
elif is_namespace(self.namespace, NameSpace.KV_STORE_RELATION_CHUNKS):
# Get current UTC time and convert to naive datetime for database storage
current_time = datetime.datetime.now(timezone.utc).replace(tzinfo=None)
for k, v in data.items():
upsert_sql = SQL_TEMPLATES["upsert_relation_chunks"]
_data = {
"workspace": self.workspace,
"id": k,
"chunk_ids": json.dumps(v["chunk_ids"]),
"count": v["count"],
"create_time": current_time,
"update_time": current_time,
}
await self.db.execute(upsert_sql, _data)
async def index_done_callback(self) -> None: async def index_done_callback(self) -> None:
# PG handles persistence automatically # PG handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for is_empty check: {self.namespace}"
)
return True
sql = f"SELECT EXISTS(SELECT 1 FROM {table_name} WHERE workspace=$1 LIMIT 1) as has_data"
try:
result = await self.db.query(sql, [self.workspace])
return not result.get("has_data", False) if result else True
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
@ -2970,6 +2977,28 @@ class PGDocStatusStorage(DocStatusStorage):
# PG handles persistence automatically # PG handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
table_name = namespace_to_table_name(self.namespace)
if not table_name:
logger.error(
f"[{self.workspace}] Unknown namespace for is_empty check: {self.namespace}"
)
return True
sql = f"SELECT EXISTS(SELECT 1 FROM {table_name} WHERE workspace=$1 LIMIT 1) as has_data"
try:
result = await self.db.query(sql, [self.workspace])
return not result.get("has_data", False) if result else True
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs """Delete specific records from storage by their IDs
@ -4721,6 +4750,8 @@ NAMESPACE_TABLE_MAP = {
NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS", NameSpace.KV_STORE_TEXT_CHUNKS: "LIGHTRAG_DOC_CHUNKS",
NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES", NameSpace.KV_STORE_FULL_ENTITIES: "LIGHTRAG_FULL_ENTITIES",
NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS", NameSpace.KV_STORE_FULL_RELATIONS: "LIGHTRAG_FULL_RELATIONS",
NameSpace.KV_STORE_ENTITY_CHUNKS: "LIGHTRAG_ENTITY_CHUNKS",
NameSpace.KV_STORE_RELATION_CHUNKS: "LIGHTRAG_RELATION_CHUNKS",
NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE", NameSpace.KV_STORE_LLM_RESPONSE_CACHE: "LIGHTRAG_LLM_CACHE",
NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_VDB_CHUNKS", NameSpace.VECTOR_STORE_CHUNKS: "LIGHTRAG_VDB_CHUNKS",
NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY", NameSpace.VECTOR_STORE_ENTITIES: "LIGHTRAG_VDB_ENTITY",
@ -4861,6 +4892,28 @@ TABLES = {
CONSTRAINT LIGHTRAG_FULL_RELATIONS_PK PRIMARY KEY (workspace, id) CONSTRAINT LIGHTRAG_FULL_RELATIONS_PK PRIMARY KEY (workspace, id)
)""" )"""
}, },
"LIGHTRAG_ENTITY_CHUNKS": {
"ddl": """CREATE TABLE LIGHTRAG_ENTITY_CHUNKS (
id VARCHAR(512),
workspace VARCHAR(255),
chunk_ids JSONB,
count INTEGER,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_ENTITY_CHUNKS_PK PRIMARY KEY (workspace, id)
)"""
},
"LIGHTRAG_RELATION_CHUNKS": {
"ddl": """CREATE TABLE LIGHTRAG_RELATION_CHUNKS (
id VARCHAR(512),
workspace VARCHAR(255),
chunk_ids JSONB,
count INTEGER,
create_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
update_time TIMESTAMP(0) DEFAULT CURRENT_TIMESTAMP,
CONSTRAINT LIGHTRAG_RELATION_CHUNKS_PK PRIMARY KEY (workspace, id)
)"""
},
} }
@ -4918,6 +4971,26 @@ SQL_TEMPLATES = {
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id = ANY($2) FROM LIGHTRAG_FULL_RELATIONS WHERE workspace=$1 AND id = ANY($2)
""", """,
"get_by_id_entity_chunks": """SELECT id, chunk_ids, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_ENTITY_CHUNKS WHERE workspace=$1 AND id=$2
""",
"get_by_id_relation_chunks": """SELECT id, chunk_ids, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_RELATION_CHUNKS WHERE workspace=$1 AND id=$2
""",
"get_by_ids_entity_chunks": """SELECT id, chunk_ids, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_ENTITY_CHUNKS WHERE workspace=$1 AND id = ANY($2)
""",
"get_by_ids_relation_chunks": """SELECT id, chunk_ids, count,
EXTRACT(EPOCH FROM create_time)::BIGINT as create_time,
EXTRACT(EPOCH FROM update_time)::BIGINT as update_time
FROM LIGHTRAG_RELATION_CHUNKS WHERE workspace=$1 AND id = ANY($2)
""",
"filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})", "filter_keys": "SELECT id FROM {table_name} WHERE workspace=$1 AND id IN ({ids})",
"upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, doc_name, workspace) "upsert_doc_full": """INSERT INTO LIGHTRAG_DOC_FULL (id, content, doc_name, workspace)
VALUES ($1, $2, $3, $4) VALUES ($1, $2, $3, $4)
@ -4965,6 +5038,22 @@ SQL_TEMPLATES = {
count=EXCLUDED.count, count=EXCLUDED.count,
update_time = EXCLUDED.update_time update_time = EXCLUDED.update_time
""", """,
"upsert_entity_chunks": """INSERT INTO LIGHTRAG_ENTITY_CHUNKS (workspace, id, chunk_ids, count,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (workspace,id) DO UPDATE
SET chunk_ids=EXCLUDED.chunk_ids,
count=EXCLUDED.count,
update_time = EXCLUDED.update_time
""",
"upsert_relation_chunks": """INSERT INTO LIGHTRAG_RELATION_CHUNKS (workspace, id, chunk_ids, count,
create_time, update_time)
VALUES ($1, $2, $3, $4, $5, $6)
ON CONFLICT (workspace,id) DO UPDATE
SET chunk_ids=EXCLUDED.chunk_ids,
count=EXCLUDED.count,
update_time = EXCLUDED.update_time
""",
# SQL for VectorStorage # SQL for VectorStorage
"upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens, "upsert_chunk": """INSERT INTO LIGHTRAG_VDB_CHUNKS (workspace, id, tokens,
chunk_order_index, full_doc_id, content, content_vector, file_path, chunk_order_index, full_doc_id, content, content_vector, file_path,

View file

@ -304,51 +304,6 @@ class RedisKVStorage(BaseKVStorage):
logger.error(f"[{self.workspace}] JSON decode error in batch get: {e}") logger.error(f"[{self.workspace}] JSON decode error in batch get: {e}")
return [None] * len(ids) return [None] * len(ids)
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
async with self._get_redis_connection() as redis:
try:
# Get all keys for this namespace
keys = await redis.keys(f"{self.final_namespace}:*")
if not keys:
return {}
# Get all values in batch
pipe = redis.pipeline()
for key in keys:
pipe.get(key)
values = await pipe.execute()
# Build result dictionary
result = {}
for key, value in zip(keys, values):
if value:
# Extract the ID part (after namespace:)
key_id = key.split(":", 1)[1]
try:
data = json.loads(value)
# Ensure time fields are present for all documents
data.setdefault("create_time", 0)
data.setdefault("update_time", 0)
result[key_id] = data
except json.JSONDecodeError as e:
logger.error(
f"[{self.workspace}] JSON decode error for key {key}: {e}"
)
continue
return result
except Exception as e:
logger.error(
f"[{self.workspace}] Error getting all data from Redis: {e}"
)
return {}
async def filter_keys(self, keys: set[str]) -> set[str]: async def filter_keys(self, keys: set[str]) -> set[str]:
async with self._get_redis_connection() as redis: async with self._get_redis_connection() as redis:
pipe = redis.pipeline() pipe = redis.pipeline()
@ -407,8 +362,25 @@ class RedisKVStorage(BaseKVStorage):
# Redis handles persistence automatically # Redis handles persistence automatically
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
pattern = f"{self.final_namespace}:*"
try:
async with self._get_redis_connection() as redis:
# Use scan to check if any keys exist
async for key in redis.scan_iter(match=pattern, count=1):
return False # Found at least one key
return True # No keys found
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None: async def delete(self, ids: list[str]) -> None:
"""Delete entries with specified IDs""" """Delete specific records from storage by their IDs"""
if not ids: if not ids:
return return
@ -868,6 +840,23 @@ class RedisDocStatusStorage(DocStatusStorage):
"""Redis handles persistence automatically""" """Redis handles persistence automatically"""
pass pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
pattern = f"{self.final_namespace}:*"
try:
async with self._get_redis_connection() as redis:
# Use scan to check if any keys exist
async for key in redis.scan_iter(match=pattern, count=1):
return False # Found at least one key
return True # No keys found
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
@redis_retry @redis_retry
async def upsert(self, data: dict[str, dict[str, Any]]) -> None: async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
"""Insert or update document status data""" """Insert or update document status data"""

View file

@ -40,10 +40,15 @@ from lightrag.constants import (
DEFAULT_MAX_ASYNC, DEFAULT_MAX_ASYNC,
DEFAULT_MAX_PARALLEL_INSERT, DEFAULT_MAX_PARALLEL_INSERT,
DEFAULT_MAX_GRAPH_NODES, DEFAULT_MAX_GRAPH_NODES,
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY,
DEFAULT_MAX_SOURCE_IDS_PER_RELATION,
DEFAULT_ENTITY_TYPES, DEFAULT_ENTITY_TYPES,
DEFAULT_SUMMARY_LANGUAGE, DEFAULT_SUMMARY_LANGUAGE,
DEFAULT_LLM_TIMEOUT, DEFAULT_LLM_TIMEOUT,
DEFAULT_EMBEDDING_TIMEOUT, DEFAULT_EMBEDDING_TIMEOUT,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
DEFAULT_MAX_FILE_PATHS,
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
) )
from lightrag.utils import get_env_value from lightrag.utils import get_env_value
@ -98,6 +103,9 @@ from lightrag.utils import (
generate_track_id, generate_track_id,
convert_to_user_format, convert_to_user_format,
logger, logger,
subtract_source_ids,
make_relation_chunk_key,
normalize_source_ids_limit_method,
) )
from lightrag.types import KnowledgeGraph from lightrag.types import KnowledgeGraph
from dotenv import load_dotenv from dotenv import load_dotenv
@ -360,6 +368,41 @@ class LightRAG:
) )
"""Maximum number of graph nodes to return in knowledge graph queries.""" """Maximum number of graph nodes to return in knowledge graph queries."""
max_source_ids_per_entity: int = field(
default=get_env_value(
"MAX_SOURCE_IDS_PER_ENTITY", DEFAULT_MAX_SOURCE_IDS_PER_ENTITY, int
)
)
"""Maximum number of source (chunk) ids in entity Grpah + VDB."""
max_source_ids_per_relation: int = field(
default=get_env_value(
"MAX_SOURCE_IDS_PER_RELATION",
DEFAULT_MAX_SOURCE_IDS_PER_RELATION,
int,
)
)
"""Maximum number of source (chunk) ids in relation Graph + VDB."""
source_ids_limit_method: str = field(
default_factory=lambda: normalize_source_ids_limit_method(
get_env_value(
"SOURCE_IDS_LIMIT_METHOD",
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
str,
)
)
)
"""Strategy for enforcing source_id limits: IGNORE_NEW or FIFO."""
max_file_paths: int = field(
default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int)
)
"""Maximum number of file paths to store in entity/relation file_path field."""
file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER)
"""Placeholder text when file paths exceed max_file_paths limit."""
addon_params: dict[str, Any] = field( addon_params: dict[str, Any] = field(
default_factory=lambda: { default_factory=lambda: {
"language": get_env_value( "language": get_env_value(
@ -529,6 +572,18 @@ class LightRAG:
embedding_func=self.embedding_func, embedding_func=self.embedding_func,
) )
self.entity_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=NameSpace.KV_STORE_ENTITY_CHUNKS,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.relation_chunks: BaseKVStorage = self.key_string_value_json_storage_cls( # type: ignore
namespace=NameSpace.KV_STORE_RELATION_CHUNKS,
workspace=self.workspace,
embedding_func=self.embedding_func,
)
self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore self.chunk_entity_relation_graph: BaseGraphStorage = self.graph_storage_cls( # type: ignore
namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION, namespace=NameSpace.GRAPH_STORE_CHUNK_ENTITY_RELATION,
workspace=self.workspace, workspace=self.workspace,
@ -588,6 +643,8 @@ class LightRAG:
self.text_chunks, self.text_chunks,
self.full_entities, self.full_entities,
self.full_relations, self.full_relations,
self.entity_chunks,
self.relation_chunks,
self.entities_vdb, self.entities_vdb,
self.relationships_vdb, self.relationships_vdb,
self.chunks_vdb, self.chunks_vdb,
@ -610,6 +667,8 @@ class LightRAG:
("text_chunks", self.text_chunks), ("text_chunks", self.text_chunks),
("full_entities", self.full_entities), ("full_entities", self.full_entities),
("full_relations", self.full_relations), ("full_relations", self.full_relations),
("entity_chunks", self.entity_chunks),
("relation_chunks", self.relation_chunks),
("entities_vdb", self.entities_vdb), ("entities_vdb", self.entities_vdb),
("relationships_vdb", self.relationships_vdb), ("relationships_vdb", self.relationships_vdb),
("chunks_vdb", self.chunks_vdb), ("chunks_vdb", self.chunks_vdb),
@ -665,6 +724,13 @@ class LightRAG:
logger.debug("No entities found in graph, skipping migration check") logger.debug("No entities found in graph, skipping migration check")
return return
try:
# Initialize chunk tracking storage after migration
await self._migrate_chunk_tracking_storage()
except Exception as e:
logger.error(f"Error during chunk_tracking migration: {e}")
raise e
# Check if full_entities and full_relations are empty # Check if full_entities and full_relations are empty
# Get all processed documents to check their entity/relation data # Get all processed documents to check their entity/relation data
try: try:
@ -705,11 +771,11 @@ class LightRAG:
except Exception as e: except Exception as e:
logger.error(f"Error during migration check: {e}") logger.error(f"Error during migration check: {e}")
# Don't raise the error, just log it to avoid breaking initialization raise e
except Exception as e: except Exception as e:
logger.error(f"Error in data migration check: {e}") logger.error(f"Error in data migration check: {e}")
# Don't raise the error to avoid breaking initialization raise e
async def _migrate_entity_relation_data(self, processed_docs: dict): async def _migrate_entity_relation_data(self, processed_docs: dict):
"""Migrate existing entity and relation data to full_entities and full_relations storage""" """Migrate existing entity and relation data to full_entities and full_relations storage"""
@ -808,6 +874,140 @@ class LightRAG:
f"Data migration completed: migrated {migration_count} documents with entities/relations" f"Data migration completed: migrated {migration_count} documents with entities/relations"
) )
async def _migrate_chunk_tracking_storage(self) -> None:
"""Ensure entity/relation chunk tracking KV stores exist and are seeded."""
if not self.entity_chunks or not self.relation_chunks:
return
need_entity_migration = False
need_relation_migration = False
try:
need_entity_migration = await self.entity_chunks.is_empty()
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to check entity chunks storage: {exc}")
raise exc
try:
need_relation_migration = await self.relation_chunks.is_empty()
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to check relation chunks storage: {exc}")
raise exc
if not need_entity_migration and not need_relation_migration:
return
BATCH_SIZE = 500 # Process 500 records per batch
if need_entity_migration:
try:
nodes = await self.chunk_entity_relation_graph.get_all_nodes()
except Exception as exc:
logger.error(f"Failed to fetch nodes for chunk migration: {exc}")
nodes = []
logger.info(f"Starting chunk_tracking data migration: {len(nodes)} nodes")
# Process nodes in batches
total_nodes = len(nodes)
total_batches = (total_nodes + BATCH_SIZE - 1) // BATCH_SIZE
total_migrated = 0
for batch_idx in range(total_batches):
start_idx = batch_idx * BATCH_SIZE
end_idx = min((batch_idx + 1) * BATCH_SIZE, total_nodes)
batch_nodes = nodes[start_idx:end_idx]
upsert_payload: dict[str, dict[str, object]] = {}
for node in batch_nodes:
entity_id = node.get("entity_id") or node.get("id")
if not entity_id:
continue
raw_source = node.get("source_id") or ""
chunk_ids = [
chunk_id
for chunk_id in raw_source.split(GRAPH_FIELD_SEP)
if chunk_id
]
if not chunk_ids:
continue
upsert_payload[entity_id] = {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
if upsert_payload:
await self.entity_chunks.upsert(upsert_payload)
total_migrated += len(upsert_payload)
logger.info(
f"Processed entity batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_nodes})"
)
if total_migrated > 0:
# Persist entity_chunks data to disk
await self.entity_chunks.index_done_callback()
logger.info(
f"Entity chunk_tracking migration completed: {total_migrated} records persisted"
)
if need_relation_migration:
try:
edges = await self.chunk_entity_relation_graph.get_all_edges()
except Exception as exc:
logger.error(f"Failed to fetch edges for chunk migration: {exc}")
edges = []
logger.info(f"Starting chunk_tracking data migration: {len(edges)} edges")
# Process edges in batches
total_edges = len(edges)
total_batches = (total_edges + BATCH_SIZE - 1) // BATCH_SIZE
total_migrated = 0
for batch_idx in range(total_batches):
start_idx = batch_idx * BATCH_SIZE
end_idx = min((batch_idx + 1) * BATCH_SIZE, total_edges)
batch_edges = edges[start_idx:end_idx]
upsert_payload: dict[str, dict[str, object]] = {}
for edge in batch_edges:
src = edge.get("source") or edge.get("src_id") or edge.get("src")
tgt = edge.get("target") or edge.get("tgt_id") or edge.get("tgt")
if not src or not tgt:
continue
raw_source = edge.get("source_id") or ""
chunk_ids = [
chunk_id
for chunk_id in raw_source.split(GRAPH_FIELD_SEP)
if chunk_id
]
if not chunk_ids:
continue
storage_key = make_relation_chunk_key(src, tgt)
upsert_payload[storage_key] = {
"chunk_ids": chunk_ids,
"count": len(chunk_ids),
}
if upsert_payload:
await self.relation_chunks.upsert(upsert_payload)
total_migrated += len(upsert_payload)
logger.info(
f"Processed relation batch {batch_idx + 1}/{total_batches}: {len(upsert_payload)} records (total: {total_migrated}/{total_edges})"
)
if total_migrated > 0:
# Persist relation_chunks data to disk
await self.relation_chunks.index_done_callback()
logger.info(
f"Relation chunk_tracking migration completed: {total_migrated} records persisted"
)
async def get_graph_labels(self): async def get_graph_labels(self):
text = await self.chunk_entity_relation_graph.get_all_labels() text = await self.chunk_entity_relation_graph.get_all_labels()
return text return text
@ -1670,6 +1870,8 @@ class LightRAG:
pipeline_status=pipeline_status, pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock, pipeline_status_lock=pipeline_status_lock,
llm_response_cache=self.llm_response_cache, llm_response_cache=self.llm_response_cache,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
current_file_number=current_file_number, current_file_number=current_file_number,
total_files=total_files, total_files=total_files,
file_path=file_path, file_path=file_path,
@ -1839,6 +2041,8 @@ class LightRAG:
self.text_chunks, self.text_chunks,
self.full_entities, self.full_entities,
self.full_relations, self.full_relations,
self.entity_chunks,
self.relation_chunks,
self.llm_response_cache, self.llm_response_cache,
self.entities_vdb, self.entities_vdb,
self.relationships_vdb, self.relationships_vdb,
@ -2712,9 +2916,11 @@ class LightRAG:
# 4. Analyze entities and relationships that will be affected # 4. Analyze entities and relationships that will be affected
entities_to_delete = set() entities_to_delete = set()
entities_to_rebuild = {} # entity_name -> remaining_chunk_ids entities_to_rebuild = {} # entity_name -> remaining chunk id list
relationships_to_delete = set() relationships_to_delete = set()
relationships_to_rebuild = {} # (src, tgt) -> remaining_chunk_ids relationships_to_rebuild = {} # (src, tgt) -> remaining chunk id list
entity_chunk_updates: dict[str, list[str]] = {}
relation_chunk_updates: dict[tuple[str, str], list[str]] = {}
try: try:
# Get affected entities and relations from full_entities and full_relations storage # Get affected entities and relations from full_entities and full_relations storage
@ -2770,14 +2976,41 @@ class LightRAG:
# Process entities # Process entities
for node_data in affected_nodes: for node_data in affected_nodes:
node_label = node_data.get("entity_id") node_label = node_data.get("entity_id")
if node_label and "source_id" in node_data: if not node_label:
sources = set(node_data["source_id"].split(GRAPH_FIELD_SEP)) continue
remaining_sources = sources - chunk_ids
if not remaining_sources: existing_sources: list[str] = []
entities_to_delete.add(node_label) if self.entity_chunks:
elif remaining_sources != sources: stored_chunks = await self.entity_chunks.get_by_id(node_label)
entities_to_rebuild[node_label] = remaining_sources if stored_chunks and isinstance(stored_chunks, dict):
existing_sources = [
chunk_id
for chunk_id in stored_chunks.get("chunk_ids", [])
if chunk_id
]
if not existing_sources and node_data.get("source_id"):
existing_sources = [
chunk_id
for chunk_id in node_data["source_id"].split(
GRAPH_FIELD_SEP
)
if chunk_id
]
if not existing_sources:
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
if not remaining_sources:
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
elif remaining_sources != existing_sources:
entities_to_rebuild[node_label] = remaining_sources
entity_chunk_updates[node_label] = remaining_sources
else:
logger.info(f"Untouch entity: {node_label}")
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = f"Found {len(entities_to_rebuild)} affected entities" log_message = f"Found {len(entities_to_rebuild)} affected entities"
@ -2790,21 +3023,51 @@ class LightRAG:
src = edge_data.get("source") src = edge_data.get("source")
tgt = edge_data.get("target") tgt = edge_data.get("target")
if src and tgt and "source_id" in edge_data: if not src or not tgt or "source_id" not in edge_data:
edge_tuple = tuple(sorted((src, tgt))) continue
if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
):
continue
sources = set(edge_data["source_id"].split(GRAPH_FIELD_SEP)) edge_tuple = tuple(sorted((src, tgt)))
remaining_sources = sources - chunk_ids if (
edge_tuple in relationships_to_delete
or edge_tuple in relationships_to_rebuild
):
continue
if not remaining_sources: existing_sources: list[str] = []
relationships_to_delete.add(edge_tuple) if self.relation_chunks:
elif remaining_sources != sources: storage_key = make_relation_chunk_key(src, tgt)
relationships_to_rebuild[edge_tuple] = remaining_sources stored_chunks = await self.relation_chunks.get_by_id(
storage_key
)
if stored_chunks and isinstance(stored_chunks, dict):
existing_sources = [
chunk_id
for chunk_id in stored_chunks.get("chunk_ids", [])
if chunk_id
]
if not existing_sources:
existing_sources = [
chunk_id
for chunk_id in edge_data["source_id"].split(
GRAPH_FIELD_SEP
)
if chunk_id
]
if not existing_sources:
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
if not remaining_sources:
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
elif remaining_sources != existing_sources:
relationships_to_rebuild[edge_tuple] = remaining_sources
relation_chunk_updates[edge_tuple] = remaining_sources
else:
logger.info(f"Untouch relation: {edge_tuple}")
async with pipeline_status_lock: async with pipeline_status_lock:
log_message = ( log_message = (
@ -2814,6 +3077,45 @@ class LightRAG:
pipeline_status["latest_message"] = log_message pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message) pipeline_status["history_messages"].append(log_message)
current_time = int(time.time())
if entity_chunk_updates and self.entity_chunks:
entity_upsert_payload = {}
entity_delete_ids: set[str] = set()
for entity_name, remaining in entity_chunk_updates.items():
if not remaining:
entity_delete_ids.add(entity_name)
else:
entity_upsert_payload[entity_name] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if entity_delete_ids:
await self.entity_chunks.delete(list(entity_delete_ids))
if entity_upsert_payload:
await self.entity_chunks.upsert(entity_upsert_payload)
if relation_chunk_updates and self.relation_chunks:
relation_upsert_payload = {}
relation_delete_ids: set[str] = set()
for edge_tuple, remaining in relation_chunk_updates.items():
storage_key = make_relation_chunk_key(*edge_tuple)
if not remaining:
relation_delete_ids.add(storage_key)
else:
relation_upsert_payload[storage_key] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if relation_delete_ids:
await self.relation_chunks.delete(list(relation_delete_ids))
if relation_upsert_payload:
await self.relation_chunks.upsert(relation_upsert_payload)
except Exception as e: except Exception as e:
logger.error(f"Failed to process graph analysis results: {e}") logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e raise Exception(f"Failed to process graph dependencies: {e}") from e
@ -2908,6 +3210,8 @@ class LightRAG:
global_config=asdict(self), global_config=asdict(self),
pipeline_status=pipeline_status, pipeline_status=pipeline_status,
pipeline_status_lock=pipeline_status_lock, pipeline_status_lock=pipeline_status_lock,
entity_chunks_storage=self.entity_chunks,
relation_chunks_storage=self.relation_chunks,
) )
except Exception as e: except Exception as e:

View file

@ -10,6 +10,8 @@ class NameSpace:
KV_STORE_LLM_RESPONSE_CACHE = "llm_response_cache" KV_STORE_LLM_RESPONSE_CACHE = "llm_response_cache"
KV_STORE_FULL_ENTITIES = "full_entities" KV_STORE_FULL_ENTITIES = "full_entities"
KV_STORE_FULL_RELATIONS = "full_relations" KV_STORE_FULL_RELATIONS = "full_relations"
KV_STORE_ENTITY_CHUNKS = "entity_chunks"
KV_STORE_RELATION_CHUNKS = "relation_chunks"
VECTOR_STORE_ENTITIES = "entities" VECTOR_STORE_ENTITIES = "entities"
VECTOR_STORE_RELATIONSHIPS = "relationships" VECTOR_STORE_RELATIONSHIPS = "relationships"

File diff suppressed because it is too large Load diff

View file

@ -15,7 +15,17 @@ from dataclasses import dataclass
from datetime import datetime from datetime import datetime
from functools import wraps from functools import wraps
from hashlib import md5 from hashlib import md5
from typing import Any, Protocol, Callable, TYPE_CHECKING, List, Optional from typing import (
Any,
Protocol,
Callable,
TYPE_CHECKING,
List,
Optional,
Iterable,
Sequence,
Collection,
)
import numpy as np import numpy as np
from dotenv import load_dotenv from dotenv import load_dotenv
@ -25,7 +35,9 @@ from lightrag.constants import (
DEFAULT_LOG_FILENAME, DEFAULT_LOG_FILENAME,
GRAPH_FIELD_SEP, GRAPH_FIELD_SEP,
DEFAULT_MAX_TOTAL_TOKENS, DEFAULT_MAX_TOTAL_TOKENS,
DEFAULT_MAX_FILE_PATH_LENGTH, DEFAULT_SOURCE_IDS_LIMIT_METHOD,
VALID_SOURCE_IDS_LIMIT_METHODS,
SOURCE_IDS_LIMIT_METHOD_FIFO,
) )
# Initialize logger with basic configuration # Initialize logger with basic configuration
@ -2465,63 +2477,110 @@ async def process_chunks_unified(
return final_chunks return final_chunks
def build_file_path(already_file_paths, data_list, target): def normalize_source_ids_limit_method(method: str | None) -> str:
"""Build file path string with UTF-8 byte length limit and deduplication """Normalize the source ID limiting strategy and fall back to default when invalid."""
Args: if not method:
already_file_paths: List of existing file paths return DEFAULT_SOURCE_IDS_LIMIT_METHOD
data_list: List of data items containing file_path
target: Target name for logging warnings
Returns: normalized = method.upper()
str: Combined file paths separated by GRAPH_FIELD_SEP if normalized not in VALID_SOURCE_IDS_LIMIT_METHODS:
"""
# set: deduplication
file_paths_set = {fp for fp in already_file_paths if fp}
# string: filter empty value and keep file order in already_file_paths
file_paths = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
# Check if initial file_paths already exceeds byte length limit
if len(file_paths.encode("utf-8")) >= DEFAULT_MAX_FILE_PATH_LENGTH:
logger.warning( logger.warning(
f"Initial file_paths already exceeds {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " "Unknown SOURCE_IDS_LIMIT_METHOD '%s', falling back to %s",
f"current size: {len(file_paths.encode('utf-8'))} bytes" method,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
)
return DEFAULT_SOURCE_IDS_LIMIT_METHOD
return normalized
def merge_source_ids(
existing_ids: Iterable[str] | None, new_ids: Iterable[str] | None
) -> list[str]:
"""Merge two iterables of source IDs while preserving order and removing duplicates."""
merged: list[str] = []
seen: set[str] = set()
for sequence in (existing_ids, new_ids):
if not sequence:
continue
for source_id in sequence:
if not source_id:
continue
if source_id not in seen:
seen.add(source_id)
merged.append(source_id)
return merged
def apply_source_ids_limit(
source_ids: Sequence[str],
limit: int,
method: str,
*,
identifier: str | None = None,
) -> list[str]:
"""Apply a limit strategy to a sequence of source IDs."""
if limit <= 0:
return []
source_ids_list = list(source_ids)
if len(source_ids_list) <= limit:
return source_ids_list
normalized_method = normalize_source_ids_limit_method(method)
if normalized_method == SOURCE_IDS_LIMIT_METHOD_FIFO:
truncated = source_ids_list[-limit:]
else: # IGNORE_NEW
truncated = source_ids_list[:limit]
if identifier and len(truncated) < len(source_ids_list):
logger.debug(
"Source_id truncated: %s | %s keeping %s of %s entries",
identifier,
normalized_method,
len(truncated),
len(source_ids_list),
) )
# ignored file_paths return truncated
file_paths_ignore = ""
# add file_paths
for dp in data_list:
cur_file_path = dp.get("file_path")
# empty
if not cur_file_path:
continue
# skip duplicate item
if cur_file_path in file_paths_set:
continue
# add
file_paths_set.add(cur_file_path)
# check the UTF-8 byte length def subtract_source_ids(
new_addition = GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path source_ids: Iterable[str],
if ( ids_to_remove: Collection[str],
len(file_paths.encode("utf-8")) + len(new_addition.encode("utf-8")) ) -> list[str]:
< DEFAULT_MAX_FILE_PATH_LENGTH - 5 """Remove a collection of IDs from an ordered iterable while preserving order."""
):
# append
file_paths += new_addition
else:
# ignore
file_paths_ignore += GRAPH_FIELD_SEP + cur_file_path
if file_paths_ignore: removal_set = set(ids_to_remove)
logger.warning( if not removal_set:
f"File paths exceed {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, " return [source_id for source_id in source_ids if source_id]
f"ignoring file path: {file_paths_ignore}"
) return [
return file_paths source_id
for source_id in source_ids
if source_id and source_id not in removal_set
]
def make_relation_chunk_key(src: str, tgt: str) -> str:
"""Create a deterministic storage key for relation chunk tracking."""
return GRAPH_FIELD_SEP.join(sorted((src, tgt)))
def parse_relation_chunk_key(key: str) -> tuple[str, str]:
"""Parse a relation chunk storage key back into its entity pair."""
parts = key.split(GRAPH_FIELD_SEP)
if len(parts) != 2:
raise ValueError(f"Invalid relation chunk key: {key}")
return parts[0], parts[1]
def generate_track_id(prefix: str = "upload") -> str: def generate_track_id(prefix: str = "upload") -> str:

View file

@ -183,7 +183,8 @@ const PropertyRow = ({
entityType, entityType,
sourceId, sourceId,
targetId, targetId,
isEditable = false isEditable = false,
truncate
}: { }: {
name: string name: string
value: any value: any
@ -197,6 +198,7 @@ const PropertyRow = ({
sourceId?: string sourceId?: string
targetId?: string targetId?: string
isEditable?: boolean isEditable?: boolean
truncate?: string
}) => { }) => {
const { t } = useTranslation() const { t } = useTranslation()
@ -216,7 +218,12 @@ const PropertyRow = ({
// Format the value to convert <SEP> to newlines // Format the value to convert <SEP> to newlines
const formattedValue = formatValueWithSeparators(value) const formattedValue = formatValueWithSeparators(value)
const formattedTooltip = tooltip || formatValueWithSeparators(value) let formattedTooltip = tooltip || formatValueWithSeparators(value)
// If this is source_id field and truncate info exists, append it to the tooltip
if (name === 'source_id' && truncate) {
formattedTooltip += `\n(Truncated: ${truncate})`
}
// Use EditablePropertyRow for editable fields (description, entity_id and keywords) // Use EditablePropertyRow for editable fields (description, entity_id and keywords)
if (isEditable && (name === 'description' || name === 'entity_id' || name === 'keywords')) { if (isEditable && (name === 'description' || name === 'entity_id' || name === 'keywords')) {
@ -241,7 +248,10 @@ const PropertyRow = ({
// For non-editable fields, use the regular Text component // For non-editable fields, use the regular Text component
return ( return (
<div className="flex items-center gap-2"> <div className="flex items-center gap-2">
<span className="text-primary/60 tracking-wide whitespace-nowrap">{getPropertyNameTranslation(name)}</span>: <span className="text-primary/60 tracking-wide whitespace-nowrap">
{getPropertyNameTranslation(name)}
{name === 'source_id' && truncate && <sup className="text-red-500"></sup>}
</span>:
<Text <Text
className="hover:bg-primary/20 rounded p-1 overflow-hidden text-ellipsis" className="hover:bg-primary/20 rounded p-1 overflow-hidden text-ellipsis"
tooltipClassName="max-w-96 -translate-x-13" tooltipClassName="max-w-96 -translate-x-13"
@ -306,7 +316,7 @@ const NodePropertiesView = ({ node }: { node: NodeType }) => {
{Object.keys(node.properties) {Object.keys(node.properties)
.sort() .sort()
.map((name) => { .map((name) => {
if (name === 'created_at') return null; // Hide created_at property if (name === 'created_at' || name === 'truncate') return null; // Hide created_at and truncate properties
return ( return (
<PropertyRow <PropertyRow
key={name} key={name}
@ -316,6 +326,7 @@ const NodePropertiesView = ({ node }: { node: NodeType }) => {
entityId={node.properties['entity_id']} entityId={node.properties['entity_id']}
entityType="node" entityType="node"
isEditable={name === 'description' || name === 'entity_id'} isEditable={name === 'description' || name === 'entity_id'}
truncate={node.properties['truncate']}
/> />
) )
})} })}
@ -373,7 +384,7 @@ const EdgePropertiesView = ({ edge }: { edge: EdgeType }) => {
{Object.keys(edge.properties) {Object.keys(edge.properties)
.sort() .sort()
.map((name) => { .map((name) => {
if (name === 'created_at') return null; // Hide created_at property if (name === 'created_at' || name === 'truncate') return null; // Hide created_at and truncate properties
return ( return (
<PropertyRow <PropertyRow
key={name} key={name}
@ -385,6 +396,7 @@ const EdgePropertiesView = ({ edge }: { edge: EdgeType }) => {
sourceId={edge.sourceNode?.properties['entity_id'] || edge.source} sourceId={edge.sourceNode?.properties['entity_id'] || edge.source}
targetId={edge.targetNode?.properties['entity_id'] || edge.target} targetId={edge.targetNode?.properties['entity_id'] || edge.target}
isEditable={name === 'description' || name === 'keywords'} isEditable={name === 'description' || name === 'keywords'}
truncate={edge.properties['truncate']}
/> />
) )
})} })}

View file

@ -318,10 +318,10 @@
"description": "الوصف", "description": "الوصف",
"entity_id": "الاسم", "entity_id": "الاسم",
"entity_type": "النوع", "entity_type": "النوع",
"source_id": "معرف المصدر", "source_id": "C-ID",
"Neighbour": "الجار", "Neighbour": "الجار",
"file_path": "المصدر", "file_path": "File",
"keywords": "الكلمات الرئيسية", "keywords": "Keyword",
"weight": "الوزن" "weight": "الوزن"
} }
}, },

View file

@ -318,9 +318,9 @@
"description": "Description", "description": "Description",
"entity_id": "Name", "entity_id": "Name",
"entity_type": "Type", "entity_type": "Type",
"source_id": "SrcID", "source_id": "C-ID",
"Neighbour": "Neigh", "Neighbour": "Neigh",
"file_path": "Source", "file_path": "File",
"keywords": "Keys", "keywords": "Keys",
"weight": "Weight" "weight": "Weight"
} }

View file

@ -318,9 +318,9 @@
"description": "Description", "description": "Description",
"entity_id": "Nom", "entity_id": "Nom",
"entity_type": "Type", "entity_type": "Type",
"source_id": "ID source", "source_id": "C-ID",
"Neighbour": "Voisin", "Neighbour": "Voisin",
"file_path": "Source", "file_path": "File",
"keywords": "Keys", "keywords": "Keys",
"weight": "Poids" "weight": "Poids"
} }

View file

@ -318,9 +318,9 @@
"description": "描述", "description": "描述",
"entity_id": "名称", "entity_id": "名称",
"entity_type": "类型", "entity_type": "类型",
"source_id": "信源ID", "source_id": "C-ID",
"Neighbour": "邻接", "Neighbour": "邻接",
"file_path": "信源", "file_path": "文件",
"keywords": "Keys", "keywords": "Keys",
"weight": "权重" "weight": "权重"
} }

View file

@ -318,9 +318,9 @@
"description": "描述", "description": "描述",
"entity_id": "名稱", "entity_id": "名稱",
"entity_type": "類型", "entity_type": "類型",
"source_id": "來源ID", "source_id": "C-ID",
"Neighbour": "鄰接", "Neighbour": "鄰接",
"file_path": "來源", "file_path": "檔案",
"keywords": "Keys", "keywords": "Keys",
"weight": "權重" "weight": "權重"
} }