Add entity/relation chunk tracking with configurable source ID limits

- Add entity_chunks & relation_chunks storage
- Implement KEEP/FIFO limit strategies
- Update env.example with new settings
- Add migration for chunk tracking data
- Support all KV storage

(cherry picked from commit dc62c78f98)
This commit is contained in:
yangdx 2025-10-20 15:24:15 +08:00 committed by Raphaël MANSUY
parent 7248e09fc4
commit cb5451faf8
10 changed files with 647 additions and 1615 deletions

View file

@ -29,7 +29,7 @@ WEBUI_DESCRIPTION="Simple and Fast Graph Based RAG System"
# OLLAMA_EMULATING_MODEL_NAME=lightrag
OLLAMA_EMULATING_MODEL_TAG=latest
### Max nodes for graph retrieval (Ensure WebUI local settings are also updated, which is limited to this value)
### Max nodes return from graph retrieval in webui
# MAX_GRAPH_NODES=1000
### Logging level
@ -50,8 +50,6 @@ OLLAMA_EMULATING_MODEL_TAG=latest
# JWT_ALGORITHM=HS256
### API-Key to access LightRAG Server API
### Use this key in HTTP requests with the 'X-API-Key' header
### Example: curl -H "X-API-Key: your-secure-api-key-here" http://localhost:9621/query
# LIGHTRAG_API_KEY=your-secure-api-key-here
# WHITELIST_PATHS=/health,/api/*
@ -75,6 +73,16 @@ ENABLE_LLM_CACHE=true
# MAX_RELATION_TOKENS=8000
### control the maximum tokens send to LLM (include entities, relations and chunks)
# MAX_TOTAL_TOKENS=30000
### control the maximum chunk_ids stored in vector and graph db
# MAX_SOURCE_IDS_PER_ENTITY=300
# MAX_SOURCE_IDS_PER_RELATION=300
### control chunk_ids limitation method: KEEP, FIFO (KEPP: Ingore New Chunks, FIFO: New chunks replace old chunks)
# SOURCE_IDS_LIMIT_METHOD=KEEP
### maximum number of related chunks per source entity or relation
### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph)
### Higher values increase re-ranking time
# RELATED_CHUNK_NUMBER=5
### chunk selection strategies
### VECTOR: Pick KG chunks by vector similarity, delivered chunks to the LLM aligning more closely with naive retrieval
@ -121,9 +129,6 @@ ENABLE_LLM_CACHE_FOR_EXTRACT=true
### Document processing output language: English, Chinese, French, German ...
SUMMARY_LANGUAGE=English
### PDF decryption password for protected PDF files
# PDF_DECRYPT_PASSWORD=your_pdf_password_here
### Entity types that the LLM will attempt to recognize
# ENTITY_TYPES='["Person", "Creature", "Organization", "Location", "Event", "Concept", "Method", "Content", "Data", "Artifact", "NaturalObject"]'
@ -140,22 +145,6 @@ SUMMARY_LANGUAGE=English
### Maximum context size sent to LLM for description summary
# SUMMARY_CONTEXT_SIZE=12000
### control the maximum chunk_ids stored in vector and graph db
# MAX_SOURCE_IDS_PER_ENTITY=300
# MAX_SOURCE_IDS_PER_RELATION=300
### control chunk_ids limitation method: FIFO, KEEP
### FIFO: First in first out
### KEEP: Keep oldest (less merge action and faster)
# SOURCE_IDS_LIMIT_METHOD=FIFO
# Maximum number of file paths stored in entity/relation file_path field (For displayed only, does not affect query performance)
# MAX_FILE_PATHS=100
### maximum number of related chunks per source entity or relation
### The chunk picker uses this value to determine the total number of chunks selected from KG(knowledge graph)
### Higher values increase re-ranking time
# RELATED_CHUNK_NUMBER=5
###############################
### Concurrency Configuration
###############################
@ -168,13 +157,10 @@ MAX_PARALLEL_INSERT=2
### Num of chunks send to Embedding in single request
# EMBEDDING_BATCH_NUM=10
###########################################################################
###########################################################
### LLM Configuration
### LLM_BINDING type: openai, ollama, lollms, azure_openai, aws_bedrock, gemini
### LLM_BINDING_HOST: host only for Ollama, endpoint for other LLM service
### If LightRAG deployed in Docker:
### uses host.docker.internal instead of localhost in LLM_BINDING_HOST
###########################################################################
### LLM_BINDING type: openai, ollama, lollms, azure_openai, aws_bedrock
###########################################################
### LLM request timeout setting for all llm (0 means no timeout for Ollma)
# LLM_TIMEOUT=180
@ -183,7 +169,7 @@ LLM_MODEL=gpt-4o
LLM_BINDING_HOST=https://api.openai.com/v1
LLM_BINDING_API_KEY=your_api_key
### Env vars for Azure openai
### Optional for Azure
# AZURE_OPENAI_API_VERSION=2024-08-01-preview
# AZURE_OPENAI_DEPLOYMENT=gpt-4o
@ -193,21 +179,18 @@ LLM_BINDING_API_KEY=your_api_key
# LLM_BINDING_API_KEY=your_api_key
# LLM_BINDING=openai
### Gemini example
# LLM_BINDING=gemini
# LLM_MODEL=gemini-flash-latest
# LLM_BINDING_API_KEY=your_gemini_api_key
# LLM_BINDING_HOST=https://generativelanguage.googleapis.com
### OpenAI Compatible API Specific Parameters
### Increased temperature values may mitigate infinite inference loops in certain LLM, such as Qwen3-30B.
# OPENAI_LLM_TEMPERATURE=0.9
### Set the max_tokens to mitigate endless output of some LLM (less than LLM_TIMEOUT * llm_output_tokens/second, i.e. 9000 = 180s * 50 tokens/s)
### Typically, max_tokens does not include prompt content, though some models, such as Gemini Models, are exceptions
### For vLLM/SGLang deployed models, or most of OpenAI compatible API provider
# OPENAI_LLM_MAX_TOKENS=9000
### For OpenAI o1-mini or newer modles
OPENAI_LLM_MAX_COMPLETION_TOKENS=9000
### use the following command to see all support options for OpenAI, azure_openai or OpenRouter
### lightrag-server --llm-binding gemini --help
### Gemini Specific Parameters
# GEMINI_LLM_MAX_OUTPUT_TOKENS=9000
# GEMINI_LLM_TEMPERATURE=0.7
### Enable Thinking
# GEMINI_LLM_THINKING_CONFIG='{"thinking_budget": -1, "include_thoughts": true}'
### Disable Thinking
# GEMINI_LLM_THINKING_CONFIG='{"thinking_budget": 0, "include_thoughts": false}'
#### OpenAI's new API utilizes max_completion_tokens instead of max_tokens
# OPENAI_LLM_MAX_COMPLETION_TOKENS=9000
### use the following command to see all support options for OpenAI, azure_openai or OpenRouter
### lightrag-server --llm-binding openai --help
@ -218,16 +201,6 @@ LLM_BINDING_API_KEY=your_api_key
### Qwen3 Specific Parameters deploy by vLLM
# OPENAI_LLM_EXTRA_BODY='{"chat_template_kwargs": {"enable_thinking": false}}'
### OpenAI Compatible API Specific Parameters
### Increased temperature values may mitigate infinite inference loops in certain LLM, such as Qwen3-30B.
# OPENAI_LLM_TEMPERATURE=0.9
### Set the max_tokens to mitigate endless output of some LLM (less than LLM_TIMEOUT * llm_output_tokens/second, i.e. 9000 = 180s * 50 tokens/s)
### Typically, max_tokens does not include prompt content
### For vLLM/SGLang deployed models, or most of OpenAI compatible API provider
# OPENAI_LLM_MAX_TOKENS=9000
### For OpenAI o1-mini or newer modles utilizes max_completion_tokens instead of max_tokens
OPENAI_LLM_MAX_COMPLETION_TOKENS=9000
### use the following command to see all support options for Ollama LLM
### lightrag-server --llm-binding ollama --help
### Ollama Server Specific Parameters
@ -241,37 +214,24 @@ OLLAMA_LLM_NUM_CTX=32768
### Bedrock Specific Parameters
# BEDROCK_LLM_TEMPERATURE=1.0
#######################################################################################
####################################################################################
### Embedding Configuration (Should not be changed after the first file processed)
### EMBEDDING_BINDING: ollama, openai, azure_openai, jina, lollms, aws_bedrock
### EMBEDDING_BINDING_HOST: host only for Ollama, endpoint for other Embedding service
### If LightRAG deployed in Docker:
### uses host.docker.internal instead of localhost in EMBEDDING_BINDING_HOST
#######################################################################################
####################################################################################
# EMBEDDING_TIMEOUT=30
### Control whether to send embedding_dim parameter to embedding API
### IMPORTANT: Jina ALWAYS sends dimension parameter (API requirement) - this setting is ignored for Jina
### For OpenAI: Set to 'true' to enable dynamic dimension adjustment
### For OpenAI: Set to 'false' (default) to disable sending dimension parameter
### Note: Automatically ignored for backends that don't support dimension parameter (e.g., Ollama)
# Ollama embedding
# EMBEDDING_BINDING=ollama
# EMBEDDING_MODEL=bge-m3:latest
# EMBEDDING_DIM=1024
# EMBEDDING_BINDING_API_KEY=your_api_key
### If LightRAG deployed in Docker uses host.docker.internal instead of localhost
# EMBEDDING_BINDING_HOST=http://localhost:11434
### OpenAI compatible embedding
EMBEDDING_BINDING=openai
EMBEDDING_MODEL=text-embedding-3-large
EMBEDDING_DIM=3072
EMBEDDING_SEND_DIM=false
EMBEDDING_TOKEN_LIMIT=8192
EMBEDDING_BINDING_HOST=https://api.openai.com/v1
EMBEDDING_BINDING=ollama
EMBEDDING_MODEL=bge-m3:latest
EMBEDDING_DIM=1024
EMBEDDING_BINDING_API_KEY=your_api_key
# If the embedding service is deployed within the same Docker stack, use host.docker.internal instead of localhost
EMBEDDING_BINDING_HOST=http://localhost:11434
### OpenAI compatible (VoyageAI embedding openai compatible)
# EMBEDDING_BINDING=openai
# EMBEDDING_MODEL=text-embedding-3-large
# EMBEDDING_DIM=3072
# EMBEDDING_BINDING_HOST=https://api.openai.com/v1
# EMBEDDING_BINDING_API_KEY=your_api_key
### Optional for Azure
# AZURE_EMBEDDING_DEPLOYMENT=text-embedding-3-large
@ -279,16 +239,6 @@ EMBEDDING_BINDING_API_KEY=your_api_key
# AZURE_EMBEDDING_ENDPOINT=your_endpoint
# AZURE_EMBEDDING_API_KEY=your_api_key
### Gemini embedding
# EMBEDDING_BINDING=gemini
# EMBEDDING_MODEL=gemini-embedding-001
# EMBEDDING_DIM=1536
# EMBEDDING_TOKEN_LIMIT=2048
# EMBEDDING_BINDING_HOST=https://generativelanguage.googleapis.com
# EMBEDDING_BINDING_API_KEY=your_api_key
### Gemini embedding requires sending dimension to server
# EMBEDDING_SEND_DIM=true
### Jina AI Embedding
# EMBEDDING_BINDING=jina
# EMBEDDING_BINDING_HOST=https://api.jina.ai/v1/embeddings
@ -349,8 +299,7 @@ POSTGRES_USER=your_username
POSTGRES_PASSWORD='your_password'
POSTGRES_DATABASE=your_database
POSTGRES_MAX_CONNECTIONS=12
### DB specific workspace should not be set, keep for compatible only
### POSTGRES_WORKSPACE=forced_workspace_name
# POSTGRES_WORKSPACE=forced_workspace_name
### PostgreSQL Vector Storage Configuration
### Vector storage type: HNSW, IVFFlat
@ -396,8 +345,7 @@ NEO4J_MAX_TRANSACTION_RETRY_TIME=30
NEO4J_MAX_CONNECTION_LIFETIME=300
NEO4J_LIVENESS_CHECK_TIMEOUT=30
NEO4J_KEEP_ALIVE=true
### DB specific workspace should not be set, keep for compatible only
### NEO4J_WORKSPACE=forced_workspace_name
# NEO4J_WORKSPACE=forced_workspace_name
### MongoDB Configuration
MONGO_URI=mongodb://root:root@localhost:27017/
@ -411,14 +359,12 @@ MILVUS_DB_NAME=lightrag
# MILVUS_USER=root
# MILVUS_PASSWORD=your_password
# MILVUS_TOKEN=your_token
### DB specific workspace should not be set, keep for compatible only
### MILVUS_WORKSPACE=forced_workspace_name
# MILVUS_WORKSPACE=forced_workspace_name
### Qdrant
QDRANT_URL=http://localhost:6333
# QDRANT_API_KEY=your-api-key
### DB specific workspace should not be set, keep for compatible only
### QDRANT_WORKSPACE=forced_workspace_name
# QDRANT_WORKSPACE=forced_workspace_name
### Redis
REDIS_URI=redis://localhost:6379
@ -426,45 +372,11 @@ REDIS_SOCKET_TIMEOUT=30
REDIS_CONNECT_TIMEOUT=10
REDIS_MAX_CONNECTIONS=100
REDIS_RETRY_ATTEMPTS=3
### DB specific workspace should not be set, keep for compatible only
### REDIS_WORKSPACE=forced_workspace_name
# REDIS_WORKSPACE=forced_workspace_name
### Memgraph Configuration
MEMGRAPH_URI=bolt://localhost:7687
MEMGRAPH_USERNAME=
MEMGRAPH_PASSWORD=
MEMGRAPH_DATABASE=memgraph
### DB specific workspace should not be set, keep for compatible only
### MEMGRAPH_WORKSPACE=forced_workspace_name
############################
### Evaluation Configuration
############################
### RAGAS evaluation models (used for RAG quality assessment)
### ⚠️ IMPORTANT: Both LLM and Embedding endpoints MUST be OpenAI-compatible
### Default uses OpenAI models for evaluation
### LLM Configuration for Evaluation
# EVAL_LLM_MODEL=gpt-4o-mini
### API key for LLM evaluation (fallback to OPENAI_API_KEY if not set)
# EVAL_LLM_BINDING_API_KEY=your_api_key
### Custom OpenAI-compatible endpoint for LLM evaluation (optional)
# EVAL_LLM_BINDING_HOST=https://api.openai.com/v1
### Embedding Configuration for Evaluation
# EVAL_EMBEDDING_MODEL=text-embedding-3-large
### API key for embeddings (fallback: EVAL_LLM_BINDING_API_KEY -> OPENAI_API_KEY)
# EVAL_EMBEDDING_BINDING_API_KEY=your_embedding_api_key
### Custom OpenAI-compatible endpoint for embeddings (fallback: EVAL_LLM_BINDING_HOST)
# EVAL_EMBEDDING_BINDING_HOST=https://api.openai.com/v1
### Performance Tuning
### Number of concurrent test case evaluations
### Lower values reduce API rate limit issues but increase evaluation time
# EVAL_MAX_CONCURRENT=2
### TOP_K query parameter of LightRAG (default: 10)
### Number of entities or relations retrieved from KG
# EVAL_QUERY_TOP_K=10
### LLM request retry and timeout settings for evaluation
# EVAL_LLM_MAX_RETRIES=5
# EVAL_LLM_TIMEOUT=180
# MEMGRAPH_WORKSPACE=forced_workspace_name

View file

@ -378,6 +378,14 @@ class BaseKVStorage(StorageNameSpace, ABC):
None
"""
@abstractmethod
async def is_empty(self) -> bool:
"""Check if the storage is empty
Returns:
bool: True if storage contains no data, False otherwise
"""
@dataclass
class BaseGraphStorage(StorageNameSpace, ABC):

View file

@ -13,7 +13,16 @@ DEFAULT_MAX_GRAPH_NODES = 1000
# Default values for extraction settings
DEFAULT_SUMMARY_LANGUAGE = "English" # Default language for document processing
DEFAULT_MAX_GLEANING = 1
DEFAULT_ENTITY_NAME_MAX_LENGTH = 256
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 3
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 3
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP"
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO"
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_KEEP
VALID_SOURCE_IDS_LIMIT_METHODS = {
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
}
# Number of description fragments to trigger LLM summary
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE = 8
@ -38,7 +47,7 @@ DEFAULT_ENTITY_TYPES = [
"NaturalObject",
]
# Separator for: description, source_id and relation-key fields(Can not be changed after data inserted)
# Separator for graph fields
GRAPH_FIELD_SEP = "<SEP>"
# Query and retrieval configuration defaults
@ -58,27 +67,8 @@ DEFAULT_HISTORY_TURNS = 0
DEFAULT_MIN_RERANK_SCORE = 0.0
DEFAULT_RERANK_BINDING = "null"
# Default source ids limit in meta data for entity and relation
DEFAULT_MAX_SOURCE_IDS_PER_ENTITY = 300
DEFAULT_MAX_SOURCE_IDS_PER_RELATION = 300
### control chunk_ids limitation method: FIFO, FIFO
### FIFO: First in first out
### KEEP: Keep oldest (less merge action and faster)
SOURCE_IDS_LIMIT_METHOD_KEEP = "KEEP"
SOURCE_IDS_LIMIT_METHOD_FIFO = "FIFO"
DEFAULT_SOURCE_IDS_LIMIT_METHOD = SOURCE_IDS_LIMIT_METHOD_FIFO
VALID_SOURCE_IDS_LIMIT_METHODS = {
SOURCE_IDS_LIMIT_METHOD_KEEP,
SOURCE_IDS_LIMIT_METHOD_FIFO,
}
# Maximum number of file paths stored in entity/relation file_path field (For displayed only, does not affect query performance)
DEFAULT_MAX_FILE_PATHS = 100
# Field length of file_path in Milvus Schema for entity and relation (Should not be changed)
# file_path must store all file paths up to the DEFAULT_MAX_FILE_PATHS limit within the metadata.
# File path configuration for vector and graph database(Should not be changed, used in Milvus Schema)
DEFAULT_MAX_FILE_PATH_LENGTH = 32768
# Placeholder for more file paths in meta data for entity and relation (Should not be changed)
DEFAULT_FILE_PATH_MORE_PLACEHOLDER = "truncated"
# Default temperature for LLM
DEFAULT_TEMPERATURE = 1.0

View file

@ -13,7 +13,7 @@ from lightrag.utils import (
from lightrag.exceptions import StorageNotInitializedError
from .shared_storage import (
get_namespace_data,
get_namespace_lock,
get_storage_lock,
get_data_init_lock,
get_update_flag,
set_all_update_flags,
@ -46,20 +46,12 @@ class JsonKVStorage(BaseKVStorage):
async def initialize(self):
"""Initialize storage data"""
self._storage_lock = get_namespace_lock(
self.final_namespace, workspace=self.workspace
)
self.storage_updated = await get_update_flag(
self.final_namespace, workspace=self.workspace
)
self._storage_lock = get_storage_lock()
self.storage_updated = await get_update_flag(self.final_namespace)
async with get_data_init_lock():
# check need_init must before get_namespace_data
need_init = await try_initialize_namespace(
self.final_namespace, workspace=self.workspace
)
self._data = await get_namespace_data(
self.final_namespace, workspace=self.workspace
)
need_init = await try_initialize_namespace(self.final_namespace)
self._data = await get_namespace_data(self.final_namespace)
if need_init:
loaded_data = load_json(self._file_name) or {}
async with self._storage_lock:
@ -89,23 +81,8 @@ class JsonKVStorage(BaseKVStorage):
logger.debug(
f"[{self.workspace}] Process {os.getpid()} KV writting {data_count} records to {self.namespace}"
)
# Write JSON and check if sanitization was applied
needs_reload = write_json(data_dict, self._file_name)
# If data was sanitized, reload cleaned data to update shared memory
if needs_reload:
logger.info(
f"[{self.workspace}] Reloading sanitized data into shared memory for {self.namespace}"
)
cleaned_data = load_json(self._file_name)
if cleaned_data is not None:
self._data.clear()
self._data.update(cleaned_data)
await clear_all_update_flags(
self.final_namespace, workspace=self.workspace
)
write_json(data_dict, self._file_name)
await clear_all_update_flags(self.final_namespace)
async def get_by_id(self, id: str) -> dict[str, Any] | None:
async with self._storage_lock:
@ -178,7 +155,7 @@ class JsonKVStorage(BaseKVStorage):
v["_id"] = k
self._data.update(data)
await set_all_update_flags(self.final_namespace, workspace=self.workspace)
await set_all_update_flags(self.final_namespace)
async def delete(self, ids: list[str]) -> None:
"""Delete specific records from storage by their IDs
@ -201,9 +178,7 @@ class JsonKVStorage(BaseKVStorage):
any_deleted = True
if any_deleted:
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.final_namespace)
async def is_empty(self) -> bool:
"""Check if the storage is empty
@ -231,9 +206,7 @@ class JsonKVStorage(BaseKVStorage):
try:
async with self._storage_lock:
self._data.clear()
await set_all_update_flags(
self.final_namespace, workspace=self.workspace
)
await set_all_update_flags(self.final_namespace)
await self.index_done_callback()
logger.info(
@ -251,7 +224,7 @@ class JsonKVStorage(BaseKVStorage):
data: Original data dictionary that may contain legacy structure
Returns:
Migrated data dictionary with flattened cache keys (sanitized if needed)
Migrated data dictionary with flattened cache keys
"""
from lightrag.utils import generate_cache_key
@ -288,17 +261,8 @@ class JsonKVStorage(BaseKVStorage):
logger.info(
f"[{self.workspace}] Migrated {migration_count} legacy cache entries to flattened structure"
)
# Persist migrated data immediately and check if sanitization was applied
needs_reload = write_json(migrated_data, self._file_name)
# If data was sanitized during write, reload cleaned data
if needs_reload:
logger.info(
f"[{self.workspace}] Reloading sanitized migration data for {self.namespace}"
)
cleaned_data = load_json(self._file_name)
if cleaned_data is not None:
return cleaned_data # Return cleaned data to update shared memory
# Persist migrated data immediately
write_json(migrated_data, self._file_name)
return migrated_data

View file

@ -174,22 +174,6 @@ class MongoKVStorage(BaseKVStorage):
existing_ids = {str(x["_id"]) async for x in cursor}
return keys - existing_ids
async def get_all(self) -> dict[str, Any]:
"""Get all data from storage
Returns:
Dictionary containing all stored data
"""
cursor = self._data.find({})
result = {}
async for doc in cursor:
doc_id = doc.pop("_id")
# Ensure time fields are present for all documents
doc.setdefault("create_time", 0)
doc.setdefault("update_time", 0)
result[doc_id] = doc
return result
async def upsert(self, data: dict[str, dict[str, Any]]) -> None:
logger.debug(f"[{self.workspace}] Inserting {len(data)} to {self.namespace}")
if not data:
@ -235,6 +219,20 @@ class MongoKVStorage(BaseKVStorage):
# Mongo handles persistence automatically
pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
try:
# Use count_documents with limit 1 for efficiency
count = await self._data.count_documents({}, limit=1)
return count == 0
except PyMongoError as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def delete(self, ids: list[str]) -> None:
"""Delete documents with specified IDs
@ -463,6 +461,20 @@ class MongoDocStatusStorage(DocStatusStorage):
# Mongo handles persistence automatically
pass
async def is_empty(self) -> bool:
"""Check if the storage is empty for the current workspace and namespace
Returns:
bool: True if storage is empty, False otherwise
"""
try:
# Use count_documents with limit 1 for efficiency
count = await self._data.count_documents({}, limit=1)
return count == 0
except PyMongoError as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
async def drop(self) -> dict[str, str]:
"""Drop the storage by removing all documents in the collection.

View file

@ -21,7 +21,7 @@ from lightrag.base import (
DocStatus,
DocProcessingStatus,
)
from ..kg.shared_storage import get_data_init_lock
from ..kg.shared_storage import get_data_init_lock, get_storage_lock
import json
# Import tenacity for retry logic
@ -153,7 +153,7 @@ class RedisKVStorage(BaseKVStorage):
else:
# When workspace is empty, final_namespace equals original namespace
self.final_namespace = self.namespace
self.workspace = ""
self.workspace = "_"
logger.debug(f"Final namespace (no workspace): '{self.final_namespace}'")
self._redis_url = os.environ.get(
@ -368,13 +368,12 @@ class RedisKVStorage(BaseKVStorage):
Returns:
bool: True if storage is empty, False otherwise
"""
pattern = f"{self.final_namespace}:*"
pattern = f"{self.namespace}:{self.workspace}:*"
try:
async with self._get_redis_connection() as redis:
# Use scan to check if any keys exist
async for key in redis.scan_iter(match=pattern, count=1):
return False # Found at least one key
return True # No keys found
# Use scan to check if any keys exist
async for key in self.redis.scan_iter(match=pattern, count=1):
return False # Found at least one key
return True # No keys found
except Exception as e:
logger.error(f"[{self.workspace}] Error checking if storage is empty: {e}")
return True
@ -401,39 +400,42 @@ class RedisKVStorage(BaseKVStorage):
Returns:
dict[str, str]: Status of the operation with keys 'status' and 'message'
"""
async with self._get_redis_connection() as redis:
try:
# Use SCAN to find all keys with the namespace prefix
pattern = f"{self.final_namespace}:*"
cursor = 0
deleted_count = 0
async with get_storage_lock():
async with self._get_redis_connection() as redis:
try:
# Use SCAN to find all keys with the namespace prefix
pattern = f"{self.final_namespace}:*"
cursor = 0
deleted_count = 0
while True:
cursor, keys = await redis.scan(cursor, match=pattern, count=1000)
if keys:
# Delete keys in batches
pipe = redis.pipeline()
for key in keys:
pipe.delete(key)
results = await pipe.execute()
deleted_count += sum(results)
while True:
cursor, keys = await redis.scan(
cursor, match=pattern, count=1000
)
if keys:
# Delete keys in batches
pipe = redis.pipeline()
for key in keys:
pipe.delete(key)
results = await pipe.execute()
deleted_count += sum(results)
if cursor == 0:
break
if cursor == 0:
break
logger.info(
f"[{self.workspace}] Dropped {deleted_count} keys from {self.namespace}"
)
return {
"status": "success",
"message": f"{deleted_count} keys dropped",
}
logger.info(
f"[{self.workspace}] Dropped {deleted_count} keys from {self.namespace}"
)
return {
"status": "success",
"message": f"{deleted_count} keys dropped",
}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping keys from {self.namespace}: {e}"
)
return {"status": "error", "message": str(e)}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping keys from {self.namespace}: {e}"
)
return {"status": "error", "message": str(e)}
async def _migrate_legacy_cache_structure(self):
"""Migrate legacy nested cache structure to flattened structure for Redis
@ -1088,32 +1090,35 @@ class RedisDocStatusStorage(DocStatusStorage):
async def drop(self) -> dict[str, str]:
"""Drop all document status data from storage and clean up resources"""
try:
async with self._get_redis_connection() as redis:
# Use SCAN to find all keys with the namespace prefix
pattern = f"{self.final_namespace}:*"
cursor = 0
deleted_count = 0
async with get_storage_lock():
try:
async with self._get_redis_connection() as redis:
# Use SCAN to find all keys with the namespace prefix
pattern = f"{self.final_namespace}:*"
cursor = 0
deleted_count = 0
while True:
cursor, keys = await redis.scan(cursor, match=pattern, count=1000)
if keys:
# Delete keys in batches
pipe = redis.pipeline()
for key in keys:
pipe.delete(key)
results = await pipe.execute()
deleted_count += sum(results)
while True:
cursor, keys = await redis.scan(
cursor, match=pattern, count=1000
)
if keys:
# Delete keys in batches
pipe = redis.pipeline()
for key in keys:
pipe.delete(key)
results = await pipe.execute()
deleted_count += sum(results)
if cursor == 0:
break
if cursor == 0:
break
logger.info(
f"[{self.workspace}] Dropped {deleted_count} doc status keys from {self.namespace}"
logger.info(
f"[{self.workspace}] Dropped {deleted_count} doc status keys from {self.namespace}"
)
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping doc status {self.namespace}: {e}"
)
return {"status": "success", "message": "data dropped"}
except Exception as e:
logger.error(
f"[{self.workspace}] Error dropping doc status {self.namespace}: {e}"
)
return {"status": "error", "message": str(e)}
return {"status": "error", "message": str(e)}

View file

@ -3,7 +3,6 @@ from __future__ import annotations
import traceback
import asyncio
import configparser
import inspect
import os
import time
import warnings
@ -13,7 +12,6 @@ from functools import partial
from typing import (
Any,
AsyncIterator,
Awaitable,
Callable,
Iterator,
cast,
@ -22,10 +20,8 @@ from typing import (
Optional,
List,
Dict,
Union,
)
from lightrag.prompt import PROMPTS
from lightrag.exceptions import PipelineCancelledException
from lightrag.constants import (
DEFAULT_MAX_GLEANING,
DEFAULT_FORCE_LLM_SUMMARY_ON_MERGE,
@ -51,8 +47,6 @@ from lightrag.constants import (
DEFAULT_LLM_TIMEOUT,
DEFAULT_EMBEDDING_TIMEOUT,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
DEFAULT_MAX_FILE_PATHS,
DEFAULT_FILE_PATH_MORE_PLACEHOLDER,
)
from lightrag.utils import get_env_value
@ -64,10 +58,9 @@ from lightrag.kg import (
from lightrag.kg.shared_storage import (
get_namespace_data,
get_pipeline_status_lock,
get_graph_db_lock,
get_data_init_lock,
get_default_workspace,
set_default_workspace,
get_namespace_lock,
)
from lightrag.base import (
@ -91,7 +84,7 @@ from lightrag.operate import (
merge_nodes_and_edges,
kg_query,
naive_query,
rebuild_knowledge_from_chunks,
_rebuild_knowledge_from_chunks,
)
from lightrag.constants import GRAPH_FIELD_SEP
from lightrag.utils import (
@ -247,13 +240,11 @@ class LightRAG:
int,
int,
],
Union[List[Dict[str, Any]], Awaitable[List[Dict[str, Any]]]],
List[Dict[str, Any]],
] = field(default_factory=lambda: chunking_by_token_size)
"""
Custom chunking function for splitting text into chunks before processing.
The function can be either synchronous or asynchronous.
The function should take the following parameters:
- `tokenizer`: A Tokenizer instance to use for tokenization.
@ -263,8 +254,7 @@ class LightRAG:
- `chunk_token_size`: The maximum number of tokens per chunk.
- `chunk_overlap_token_size`: The number of overlapping tokens between consecutive chunks.
The function should return a list of dictionaries (or an awaitable that resolves to a list),
where each dictionary contains the following keys:
The function should return a list of dictionaries, where each dictionary contains the following keys:
- `tokens`: The number of tokens in the chunk.
- `content`: The text content of the chunk.
@ -277,9 +267,6 @@ class LightRAG:
embedding_func: EmbeddingFunc | None = field(default=None)
"""Function for computing text embeddings. Must be set before use."""
embedding_token_limit: int | None = field(default=None, init=False)
"""Token limit for embedding model. Set automatically from embedding_func.max_token_size in __post_init__."""
embedding_batch_num: int = field(default=int(os.getenv("EMBEDDING_BATCH_NUM", 10)))
"""Batch size for embedding computations."""
@ -406,14 +393,6 @@ class LightRAG:
)
"""Strategy for enforcing source_id limits: IGNORE_NEW or FIFO."""
max_file_paths: int = field(
default=get_env_value("MAX_FILE_PATHS", DEFAULT_MAX_FILE_PATHS, int)
)
"""Maximum number of file paths to store in entity/relation file_path field."""
file_path_more_placeholder: str = field(default=DEFAULT_FILE_PATH_MORE_PLACEHOLDER)
"""Placeholder text when file paths exceed max_file_paths limit."""
addon_params: dict[str, Any] = field(
default_factory=lambda: {
"language": get_env_value(
@ -523,16 +502,6 @@ class LightRAG:
logger.debug(f"LightRAG init with param:\n {_print_config}\n")
# Init Embedding
# Step 1: Capture max_token_size before applying decorator (decorator strips dataclass attributes)
embedding_max_token_size = None
if self.embedding_func and hasattr(self.embedding_func, "max_token_size"):
embedding_max_token_size = self.embedding_func.max_token_size
logger.debug(
f"Captured embedding max_token_size: {embedding_max_token_size}"
)
self.embedding_token_limit = embedding_max_token_size
# Step 2: Apply priority wrapper decorator
self.embedding_func = priority_limit_async_func_call(
self.embedding_func_max_async,
llm_timeout=self.default_embedding_timeout,
@ -659,22 +628,6 @@ class LightRAG:
async def initialize_storages(self):
"""Storage initialization must be called one by one to prevent deadlock"""
if self._storages_status == StoragesStatus.CREATED:
# Set the first initialized workspace will set the default workspace
# Allows namespace operation without specifying workspace for backward compatibility
default_workspace = get_default_workspace()
if default_workspace is None:
set_default_workspace(self.workspace)
elif default_workspace != self.workspace:
logger.warning(
f"Creating LightRAG instance with workspace='{self.workspace}' "
f"but default workspace is already set to '{default_workspace}'."
)
# Auto-initialize pipeline_status for this workspace
from lightrag.kg.shared_storage import initialize_pipeline_status
await initialize_pipeline_status(workspace=self.workspace)
for storage in (
self.full_docs,
self.text_chunks,
@ -746,7 +699,7 @@ class LightRAG:
async def check_and_migrate_data(self):
"""Check if data migration is needed and perform migration if necessary"""
async with get_data_init_lock():
async with get_data_init_lock(enable_logging=True):
try:
# Check if migration is needed:
# 1. chunk_entity_relation_graph has entities and relations (count > 0)
@ -924,13 +877,13 @@ class LightRAG:
need_entity_migration = await self.entity_chunks.is_empty()
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to check entity chunks storage: {exc}")
raise exc
need_entity_migration = True
try:
need_relation_migration = await self.relation_chunks.is_empty()
except Exception as exc: # pragma: no cover - defensive logging
logger.error(f"Failed to check relation chunks storage: {exc}")
raise exc
need_relation_migration = True
if not need_entity_migration and not need_relation_migration:
return
@ -1609,12 +1562,8 @@ class LightRAG:
"""
# Get pipeline status shared data and lock
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
# Check if another process is already processing the queue
async with pipeline_status_lock:
@ -1644,7 +1593,6 @@ class LightRAG:
"batchs": 0, # Total number of files to be processed
"cur_batch": 0, # Number of files already processed
"request_pending": False, # Clear any previous request
"cancellation_requested": False, # Initialize cancellation flag
"latest_message": "",
}
)
@ -1661,22 +1609,6 @@ class LightRAG:
try:
# Process documents until no more documents or requests
while True:
# Check for cancellation request at the start of main loop
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
# Clear pending request
pipeline_status["request_pending"] = False
# Celar cancellation flag
pipeline_status["cancellation_requested"] = False
log_message = "Pipeline cancelled by user"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Exit directly, skipping request_pending check
return
if not to_process_docs:
log_message = "All enqueued documents have been processed"
logger.info(log_message)
@ -1739,25 +1671,14 @@ class LightRAG:
semaphore: asyncio.Semaphore,
) -> None:
"""Process single document"""
# Initialize variables at the start to prevent UnboundLocalError in error handling
file_path = "unknown_source"
current_file_number = 0
file_extraction_stage_ok = False
processing_start_time = int(time.time())
first_stage_tasks = []
entity_relation_task = None
async with semaphore:
nonlocal processed_count
current_file_number = 0
# Initialize to prevent UnboundLocalError in error handling
first_stage_tasks = []
entity_relation_task = None
try:
# Check for cancellation before starting document processing
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Get file path from status document
file_path = getattr(
status_doc, "file_path", "unknown_source"
@ -1796,28 +1717,7 @@ class LightRAG:
)
content = content_data["content"]
# Call chunking function, supporting both sync and async implementations
chunking_result = self.chunking_func(
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
# If result is awaitable, await to get actual result
if inspect.isawaitable(chunking_result):
chunking_result = await chunking_result
# Validate return type
if not isinstance(chunking_result, (list, tuple)):
raise TypeError(
f"chunking_func must return a list or tuple of dicts, "
f"got {type(chunking_result)}"
)
# Build chunks dictionary
# Generate chunks from document
chunks: dict[str, Any] = {
compute_mdhash_id(dp["content"], prefix="chunk-"): {
**dp,
@ -1825,7 +1725,14 @@ class LightRAG:
"file_path": file_path, # Add file path to each chunk
"llm_cache_list": [], # Initialize empty LLM cache list for each chunk
}
for dp in chunking_result
for dp in self.chunking_func(
self.tokenizer,
content,
split_by_character,
split_by_character_only,
self.chunk_overlap_token_size,
self.chunk_token_size,
)
}
if not chunks:
@ -1834,11 +1741,6 @@ class LightRAG:
# Record processing start time
processing_start_time = int(time.time())
# Check for cancellation before entity extraction
async with pipeline_status_lock:
if pipeline_status.get("cancellation_requested", False):
raise PipelineCancelledException("User cancelled")
# Process document in two stages
# Stage 1: Process text chunks and docs (parallel execution)
doc_status_task = asyncio.create_task(
@ -1889,33 +1791,20 @@ class LightRAG:
chunks, pipeline_status, pipeline_status_lock
)
)
chunk_results = await entity_relation_task
await entity_relation_task
file_extraction_stage_ok = True
except Exception as e:
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Failed to extract document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(error_msg)
# Cancel tasks that are not yet completed
all_tasks = first_stage_tasks + (
@ -1925,14 +1814,9 @@ class LightRAG:
if task and not task.done():
task.cancel()
# Persistent llm cache with error handling
# Persistent llm cache
if self.llm_response_cache:
try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
await self.llm_response_cache.index_done_callback()
# Record processing end time for failed case
processing_end_time = int(time.time())
@ -1962,16 +1846,8 @@ class LightRAG:
# Concurrency is controlled by keyed lock for individual entities and relationships
if file_extraction_stage_ok:
try:
# Check for cancellation before merge
async with pipeline_status_lock:
if pipeline_status.get(
"cancellation_requested", False
):
raise PipelineCancelledException(
"User cancelled"
)
# Use chunk_results from entity_relation_task
# Get chunk_results from entity_relation_task
chunk_results = await entity_relation_task
await merge_nodes_and_edges(
chunk_results=chunk_results, # result collected from entity_relation_task
knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -2028,38 +1904,22 @@ class LightRAG:
)
except Exception as e:
# Check if this is a user cancellation
if isinstance(e, PipelineCancelledException):
# User cancellation - log brief message only, no traceback
error_msg = f"User cancelled during merge {current_file_number}/{total_files}: {file_path}"
logger.warning(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
error_msg
)
else:
# Other exceptions - log with traceback
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Log error and update pipeline status
logger.error(traceback.format_exc())
error_msg = f"Merging stage failed in document {current_file_number}/{total_files}: {file_path}"
logger.error(error_msg)
async with pipeline_status_lock:
pipeline_status["latest_message"] = error_msg
pipeline_status["history_messages"].append(
traceback.format_exc()
)
pipeline_status["history_messages"].append(
error_msg
)
# Persistent llm cache with error handling
# Persistent llm cache
if self.llm_response_cache:
try:
await self.llm_response_cache.index_done_callback()
except Exception as persist_error:
logger.error(
f"Failed to persist LLM cache: {persist_error}"
)
await self.llm_response_cache.index_done_callback()
# Record processing end time for failed case
processing_end_time = int(time.time())
@ -2100,19 +1960,7 @@ class LightRAG:
)
# Wait for all document processing to complete
try:
await asyncio.gather(*doc_tasks)
except PipelineCancelledException:
# Cancel all remaining tasks
for task in doc_tasks:
if not task.done():
task.cancel()
# Wait for all tasks to complete cancellation
await asyncio.wait(doc_tasks, return_when=asyncio.ALL_COMPLETED)
# Exit directly (document statuses already updated in process_document)
return
await asyncio.gather(*doc_tasks)
# Check if there's a pending request to process more documents (with lock)
has_pending_request = False
@ -2143,14 +1991,11 @@ class LightRAG:
to_process_docs.update(pending_docs)
finally:
log_message = "Enqueued document processing pipeline stopped"
log_message = "Enqueued document processing pipeline stoped"
logger.info(log_message)
# Always reset busy status and cancellation flag when done or if an exception occurs (with lock)
# Always reset busy status when done or if an exception occurs (with lock)
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = (
False # Always reset cancellation flag
)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
@ -2938,9 +2783,7 @@ class LightRAG:
# Return the dictionary containing statuses only for the found document IDs
return found_statuses
async def adelete_by_doc_id(
self, doc_id: str, delete_llm_cache: bool = False
) -> DeletionResult:
async def adelete_by_doc_id(self, doc_id: str) -> DeletionResult:
"""Delete a document and all its related data, including chunks, graph elements.
This method orchestrates a comprehensive deletion process for a given document ID.
@ -2950,8 +2793,6 @@ class LightRAG:
Args:
doc_id (str): The unique identifier of the document to be deleted.
delete_llm_cache (bool): Whether to delete cached LLM extraction results
associated with the document. Defaults to False.
Returns:
DeletionResult: An object containing the outcome of the deletion process.
@ -2961,55 +2802,12 @@ class LightRAG:
- `status_code` (int): HTTP status code (e.g., 200, 404, 500).
- `file_path` (str | None): The file path of the deleted document, if available.
"""
# Get pipeline status shared data and lock for validation
pipeline_status = await get_namespace_data(
"pipeline_status", workspace=self.workspace
)
pipeline_status_lock = get_namespace_lock(
"pipeline_status", workspace=self.workspace
)
# Track whether WE acquired the pipeline
we_acquired_pipeline = False
# Check and acquire pipeline if needed
async with pipeline_status_lock:
if not pipeline_status.get("busy", False):
# Pipeline is idle - WE acquire it for this deletion
we_acquired_pipeline = True
pipeline_status.update(
{
"busy": True,
"job_name": "Deleting 1 document",
"job_start": datetime.now(timezone.utc).isoformat(),
"docs": 1,
"batchs": 1,
"cur_batch": 0,
"request_pending": False,
"cancellation_requested": False,
"latest_message": f"Starting deletion for document: {doc_id}",
}
)
# Initialize history messages
pipeline_status["history_messages"][:] = [
f"Starting deletion for document: {doc_id}"
]
else:
# Pipeline already busy - verify it's a deletion job
job_name = pipeline_status.get("job_name", "").lower()
if "deleting" not in job_name or "document" not in job_name:
return DeletionResult(
status="not_allowed",
doc_id=doc_id,
message=f"Deletion not allowed: current job '{pipeline_status.get('job_name')}' is not a document deletion job",
status_code=403,
file_path=None,
)
# Pipeline is busy with deletion - proceed without acquiring
deletion_operations_started = False
original_exception = None
doc_llm_cache_ids: list[str] = []
# Get pipeline status shared data and lock for status updates
pipeline_status = await get_namespace_data("pipeline_status")
pipeline_status_lock = get_pipeline_status_lock()
async with pipeline_status_lock:
log_message = f"Starting deletion process for document {doc_id}"
@ -3106,57 +2904,6 @@ class LightRAG:
# Mark that deletion operations have started
deletion_operations_started = True
if delete_llm_cache and chunk_ids:
if not self.llm_response_cache:
logger.info(
"Skipping LLM cache collection for document %s because cache storage is unavailable",
doc_id,
)
elif not self.text_chunks:
logger.info(
"Skipping LLM cache collection for document %s because text chunk storage is unavailable",
doc_id,
)
else:
try:
chunk_data_list = await self.text_chunks.get_by_ids(
list(chunk_ids)
)
seen_cache_ids: set[str] = set()
for chunk_data in chunk_data_list:
if not chunk_data or not isinstance(chunk_data, dict):
continue
cache_ids = chunk_data.get("llm_cache_list", [])
if not isinstance(cache_ids, list):
continue
for cache_id in cache_ids:
if (
isinstance(cache_id, str)
and cache_id
and cache_id not in seen_cache_ids
):
doc_llm_cache_ids.append(cache_id)
seen_cache_ids.add(cache_id)
if doc_llm_cache_ids:
logger.info(
"Collected %d LLM cache entries for document %s",
len(doc_llm_cache_ids),
doc_id,
)
else:
logger.info(
"No LLM cache entries found for document %s", doc_id
)
except Exception as cache_collect_error:
logger.error(
"Failed to collect LLM cache ids for document %s: %s",
doc_id,
cache_collect_error,
)
raise Exception(
f"Failed to collect LLM cache ids for document {doc_id}: {cache_collect_error}"
) from cache_collect_error
# 4. Analyze entities and relationships that will be affected
entities_to_delete = set()
entities_to_rebuild = {} # entity_name -> remaining chunk id list
@ -3242,9 +2989,6 @@ class LightRAG:
]
if not existing_sources:
# No chunk references means this entity should be deleted
entities_to_delete.add(node_label)
entity_chunk_updates[node_label] = []
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3266,7 +3010,6 @@ class LightRAG:
# Process relationships
for edge_data in affected_edges:
# source target is not in normalize order in graph db property
src = edge_data.get("source")
tgt = edge_data.get("target")
@ -3303,9 +3046,6 @@ class LightRAG:
]
if not existing_sources:
# No chunk references means this relationship should be deleted
relationships_to_delete.add(edge_tuple)
relation_chunk_updates[edge_tuple] = []
continue
remaining_sources = subtract_source_ids(existing_sources, chunk_ids)
@ -3331,31 +3071,38 @@ class LightRAG:
if entity_chunk_updates and self.entity_chunks:
entity_upsert_payload = {}
entity_delete_ids: set[str] = set()
for entity_name, remaining in entity_chunk_updates.items():
if not remaining:
# Empty entities are deleted alongside graph nodes later
continue
entity_upsert_payload[entity_name] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
entity_delete_ids.add(entity_name)
else:
entity_upsert_payload[entity_name] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if entity_delete_ids:
await self.entity_chunks.delete(list(entity_delete_ids))
if entity_upsert_payload:
await self.entity_chunks.upsert(entity_upsert_payload)
if relation_chunk_updates and self.relation_chunks:
relation_upsert_payload = {}
relation_delete_ids: set[str] = set()
for edge_tuple, remaining in relation_chunk_updates.items():
if not remaining:
# Empty relations are deleted alongside graph edges later
continue
storage_key = make_relation_chunk_key(*edge_tuple)
relation_upsert_payload[storage_key] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if not remaining:
relation_delete_ids.add(storage_key)
else:
relation_upsert_payload[storage_key] = {
"chunk_ids": remaining,
"count": len(remaining),
"updated_at": current_time,
}
if relation_delete_ids:
await self.relation_chunks.delete(list(relation_delete_ids))
if relation_upsert_payload:
await self.relation_chunks.upsert(relation_upsert_payload)
@ -3363,111 +3110,56 @@ class LightRAG:
logger.error(f"Failed to process graph analysis results: {e}")
raise Exception(f"Failed to process graph dependencies: {e}") from e
# Data integrity is ensured by allowing only one process to hold pipeline at a timeno graph db lock is needed anymore)
# Use graph database lock to prevent dirty read
graph_db_lock = get_graph_db_lock(enable_logging=False)
async with graph_db_lock:
# 5. Delete chunks from storage
if chunk_ids:
try:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
# 5. Delete chunks from storage
if chunk_ids:
try:
await self.chunks_vdb.delete(chunk_ids)
await self.text_chunks.delete(chunk_ids)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(chunk_ids)} chunks from storage"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
async with pipeline_status_lock:
log_message = (
f"Successfully deleted {len(chunk_ids)} chunks from storage"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete chunks: {e}")
raise Exception(f"Failed to delete document chunks: {e}") from e
except Exception as e:
logger.error(f"Failed to delete chunks: {e}")
raise Exception(f"Failed to delete document chunks: {e}") from e
# 6. Delete relationships that have no remaining sources
if relationships_to_delete:
try:
# Delete from relation vdb
rel_ids_to_delete = []
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
compute_mdhash_id(tgt + src, prefix="rel-"),
]
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in relationships_to_delete
# 6. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Delete from vector database
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
await self.entities_vdb.delete(entity_vdb_ids)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# 7. Delete entities that have no remaining sources
if entities_to_delete:
try:
# Batch get all edges for entities to avoid N+1 query problem
nodes_edges_dict = (
await self.chunk_entity_relation_graph.get_nodes_edges_batch(
# Delete from graph
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
)
# Debug: Check and log all edges before deleting nodes
edges_to_delete = set()
edges_still_exist = 0
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(entities_to_delete)} entities"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
for entity, edges in nodes_edges_dict.items():
if edges:
for src, tgt in edges:
# Normalize edge representation (sorted for consistency)
edge_tuple = tuple(sorted((src, tgt)))
edges_to_delete.add(edge_tuple)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
if (
src in entities_to_delete
and tgt in entities_to_delete
):
logger.warning(
f"Edge still exists: {src} <-> {tgt}"
)
elif src in entities_to_delete:
logger.warning(
f"Edge still exists: {src} --> {tgt}"
)
else:
logger.warning(
f"Edge still exists: {src} <-- {tgt}"
)
edges_still_exist += 1
if edges_still_exist:
logger.warning(
f"⚠️ {edges_still_exist} entities still has edges before deletion"
)
# Clean residual edges from VDB and storage before deleting nodes
if edges_to_delete:
# Delete from relationships_vdb
# 7. Delete relationships that have no remaining sources
if relationships_to_delete:
try:
# Delete from vector database
rel_ids_to_delete = []
for src, tgt in edges_to_delete:
for src, tgt in relationships_to_delete:
rel_ids_to_delete.extend(
[
compute_mdhash_id(src + tgt, prefix="rel-"),
@ -3476,53 +3168,28 @@ class LightRAG:
)
await self.relationships_vdb.delete(rel_ids_to_delete)
# Delete from relation_chunks storage
if self.relation_chunks:
relation_storage_keys = [
make_relation_chunk_key(src, tgt)
for src, tgt in edges_to_delete
]
await self.relation_chunks.delete(relation_storage_keys)
logger.info(
f"Cleaned {len(edges_to_delete)} residual edges from VDB and chunk-tracking storage"
# Delete from graph
await self.chunk_entity_relation_graph.remove_edges(
list(relationships_to_delete)
)
# Delete from graph (edges will be auto-deleted with nodes)
await self.chunk_entity_relation_graph.remove_nodes(
list(entities_to_delete)
)
async with pipeline_status_lock:
log_message = f"Successfully deleted {len(relationships_to_delete)} relations"
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
# Delete from vector vdb
entity_vdb_ids = [
compute_mdhash_id(entity, prefix="ent-")
for entity in entities_to_delete
]
await self.entities_vdb.delete(entity_vdb_ids)
except Exception as e:
logger.error(f"Failed to delete relationships: {e}")
raise Exception(f"Failed to delete relationships: {e}") from e
# Delete from entity_chunks storage
if self.entity_chunks:
await self.entity_chunks.delete(list(entities_to_delete))
async with pipeline_status_lock:
log_message = (
f"Successfully deleted {len(entities_to_delete)} entities"
)
logger.info(log_message)
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
except Exception as e:
logger.error(f"Failed to delete entities: {e}")
raise Exception(f"Failed to delete entities: {e}") from e
# Persist changes to graph database before entity and relationship rebuild
await self._insert_done()
# Persist changes to graph database before releasing graph database lock
await self._insert_done()
# 8. Rebuild entities and relationships from remaining chunks
if entities_to_rebuild or relationships_to_rebuild:
try:
await rebuild_knowledge_from_chunks(
await _rebuild_knowledge_from_chunks(
entities_to_rebuild=entities_to_rebuild,
relationships_to_rebuild=relationships_to_rebuild,
knowledge_graph_inst=self.chunk_entity_relation_graph,
@ -3559,23 +3226,6 @@ class LightRAG:
logger.error(f"Failed to delete document and status: {e}")
raise Exception(f"Failed to delete document and status: {e}") from e
if delete_llm_cache and doc_llm_cache_ids and self.llm_response_cache:
try:
await self.llm_response_cache.delete(doc_llm_cache_ids)
cache_log_message = f"Successfully deleted {len(doc_llm_cache_ids)} LLM cache entries for document {doc_id}"
logger.info(cache_log_message)
async with pipeline_status_lock:
pipeline_status["latest_message"] = cache_log_message
pipeline_status["history_messages"].append(cache_log_message)
log_message = cache_log_message
except Exception as cache_delete_error:
log_message = f"Failed to delete LLM cache for document {doc_id}: {cache_delete_error}"
logger.error(log_message)
logger.error(traceback.format_exc())
async with pipeline_status_lock:
pipeline_status["latest_message"] = log_message
pipeline_status["history_messages"].append(log_message)
return DeletionResult(
status="success",
doc_id=doc_id,
@ -3623,18 +3273,6 @@ class LightRAG:
f"No deletion operations were started for document {doc_id}, skipping persistence"
)
# Release pipeline only if WE acquired it
if we_acquired_pipeline:
async with pipeline_status_lock:
pipeline_status["busy"] = False
pipeline_status["cancellation_requested"] = False
completion_msg = (
f"Deletion process completed for document: {doc_id}"
)
pipeline_status["latest_message"] = completion_msg
pipeline_status["history_messages"].append(completion_msg)
logger.info(completion_msg)
async def adelete_by_entity(self, entity_name: str) -> DeletionResult:
"""Asynchronously delete an entity and all its relationships.
@ -3752,22 +3390,16 @@ class LightRAG:
)
async def aedit_entity(
self,
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
) -> dict[str, Any]:
"""Asynchronously edit entity information.
Updates entity information in the knowledge graph and re-embeds the entity in the vector database.
Also synchronizes entity_chunks_storage and relation_chunks_storage to track chunk references.
Args:
entity_name: Name of the entity to edit
updated_data: Dictionary containing updated attributes, e.g. {"description": "new description", "entity_type": "new type"}
allow_rename: Whether to allow entity renaming, defaults to True
allow_merge: Whether to merge into an existing entity when renaming to an existing name
Returns:
Dictionary containing updated entity information
@ -3781,21 +3413,14 @@ class LightRAG:
entity_name,
updated_data,
allow_rename,
allow_merge,
self.entity_chunks,
self.relation_chunks,
)
def edit_entity(
self,
entity_name: str,
updated_data: dict[str, str],
allow_rename: bool = True,
allow_merge: bool = False,
self, entity_name: str, updated_data: dict[str, str], allow_rename: bool = True
) -> dict[str, Any]:
loop = always_get_an_event_loop()
return loop.run_until_complete(
self.aedit_entity(entity_name, updated_data, allow_rename, allow_merge)
self.aedit_entity(entity_name, updated_data, allow_rename)
)
async def aedit_relation(
@ -3804,7 +3429,6 @@ class LightRAG:
"""Asynchronously edit relation information.
Updates relation (edge) information in the knowledge graph and re-embeds the relation in the vector database.
Also synchronizes the relation_chunks_storage to track which chunks reference this relation.
Args:
source_entity: Name of the source entity
@ -3823,7 +3447,6 @@ class LightRAG:
source_entity,
target_entity,
updated_data,
self.relation_chunks,
)
def edit_relation(
@ -3935,8 +3558,6 @@ class LightRAG:
target_entity,
merge_strategy,
target_entity_data,
self.entity_chunks,
self.relation_chunks,
)
def merge_entities(

View file

@ -10,8 +10,8 @@ class NameSpace:
KV_STORE_LLM_RESPONSE_CACHE = "llm_response_cache"
KV_STORE_FULL_ENTITIES = "full_entities"
KV_STORE_FULL_RELATIONS = "full_relations"
KV_STORE_TENANTS = "tenants"
KV_STORE_KNOWLEDGE_BASES = "knowledge_bases"
KV_STORE_ENTITY_CHUNKS = "entity_chunks"
KV_STORE_RELATION_CHUNKS = "relation_chunks"
VECTOR_STORE_ENTITIES = "entities"
VECTOR_STORE_RELATIONSHIPS = "relationships"

File diff suppressed because it is too large Load diff

View file

@ -1,8 +1,6 @@
from __future__ import annotations
import weakref
import sys
import asyncio
import html
import csv
@ -37,40 +35,12 @@ from lightrag.constants import (
DEFAULT_LOG_FILENAME,
GRAPH_FIELD_SEP,
DEFAULT_MAX_TOTAL_TOKENS,
DEFAULT_MAX_FILE_PATH_LENGTH,
DEFAULT_SOURCE_IDS_LIMIT_METHOD,
VALID_SOURCE_IDS_LIMIT_METHODS,
SOURCE_IDS_LIMIT_METHOD_FIFO,
)
# Precompile regex pattern for JSON sanitization (module-level, compiled once)
_SURROGATE_PATTERN = re.compile(r"[\uD800-\uDFFF\uFFFE\uFFFF]")
class SafeStreamHandler(logging.StreamHandler):
"""StreamHandler that gracefully handles closed streams during shutdown.
This handler prevents "ValueError: I/O operation on closed file" errors
that can occur when pytest or other test frameworks close stdout/stderr
before Python's logging cleanup runs.
"""
def flush(self):
"""Flush the stream, ignoring errors if the stream is closed."""
try:
super().flush()
except (ValueError, OSError):
# Stream is closed or otherwise unavailable, silently ignore
pass
def close(self):
"""Close the handler, ignoring errors if the stream is already closed."""
try:
super().close()
except (ValueError, OSError):
# Stream is closed or otherwise unavailable, silently ignore
pass
# Initialize logger with basic configuration
logger = logging.getLogger("lightrag")
logger.propagate = False # prevent log message send to root logger
@ -78,7 +48,7 @@ logger.setLevel(logging.INFO)
# Add console handler if no handlers exist
if not logger.handlers:
console_handler = SafeStreamHandler()
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
formatter = logging.Formatter("%(levelname)s: %(message)s")
console_handler.setFormatter(formatter)
@ -87,33 +57,6 @@ if not logger.handlers:
# Set httpx logging level to WARNING
logging.getLogger("httpx").setLevel(logging.WARNING)
def _patch_ascii_colors_console_handler() -> None:
"""Prevent ascii_colors from printing flush errors during interpreter exit."""
try:
from ascii_colors import ConsoleHandler
except ImportError:
return
if getattr(ConsoleHandler, "_lightrag_patched", False):
return
original_handle_error = ConsoleHandler.handle_error
def _safe_handle_error(self, message: str) -> None: # type: ignore[override]
exc_type, _, _ = sys.exc_info()
if exc_type in (ValueError, OSError) and "close" in message.lower():
return
original_handle_error(self, message)
ConsoleHandler.handle_error = _safe_handle_error # type: ignore[assignment]
ConsoleHandler._lightrag_patched = True # type: ignore[attr-defined]
_patch_ascii_colors_console_handler()
# Global import for pypinyin with startup-time logging
try:
import pypinyin
@ -341,8 +284,8 @@ def setup_logger(
logger_instance.handlers = [] # Clear existing handlers
logger_instance.propagate = False
# Add console handler with safe stream handling
console_handler = SafeStreamHandler()
# Add console handler
console_handler = logging.StreamHandler()
console_handler.setFormatter(simple_formatter)
console_handler.setLevel(level)
logger_instance.addHandler(console_handler)
@ -408,69 +351,12 @@ class TaskState:
@dataclass
class EmbeddingFunc:
"""Embedding function wrapper with dimension validation
This class wraps an embedding function to ensure that the output embeddings have the correct dimension.
This class should not be wrapped multiple times.
Args:
embedding_dim: Expected dimension of the embeddings
func: The actual embedding function to wrap
max_token_size: Optional token limit for the embedding model
send_dimensions: Whether to inject embedding_dim as a keyword argument
"""
embedding_dim: int
func: callable
max_token_size: int | None = None # Token limit for the embedding model
send_dimensions: bool = (
False # Control whether to send embedding_dim to the function
)
max_token_size: int | None = None # deprecated keep it for compatible only
async def __call__(self, *args, **kwargs) -> np.ndarray:
# Only inject embedding_dim when send_dimensions is True
if self.send_dimensions:
# Check if user provided embedding_dim parameter
if "embedding_dim" in kwargs:
user_provided_dim = kwargs["embedding_dim"]
# If user's value differs from class attribute, output warning
if (
user_provided_dim is not None
and user_provided_dim != self.embedding_dim
):
logger.warning(
f"Ignoring user-provided embedding_dim={user_provided_dim}, "
f"using declared embedding_dim={self.embedding_dim} from decorator"
)
# Inject embedding_dim from decorator
kwargs["embedding_dim"] = self.embedding_dim
# Call the actual embedding function
result = await self.func(*args, **kwargs)
# Validate embedding dimensions using total element count
total_elements = result.size # Total number of elements in the numpy array
expected_dim = self.embedding_dim
# Check if total elements can be evenly divided by embedding_dim
if total_elements % expected_dim != 0:
raise ValueError(
f"Embedding dimension mismatch detected: "
f"total elements ({total_elements}) cannot be evenly divided by "
f"expected dimension ({expected_dim}). "
)
# Optional: Verify vector count matches input text count
actual_vectors = total_elements // expected_dim
if args and isinstance(args[0], (list, tuple)):
expected_vectors = len(args[0])
if actual_vectors != expected_vectors:
raise ValueError(
f"Vector count mismatch: "
f"expected {expected_vectors} vectors but got {actual_vectors} vectors (from embedding result)."
)
return result
return await self.func(*args, **kwargs)
def compute_args_hash(*args: Any) -> str:
@ -1021,123 +907,9 @@ def load_json(file_name):
return json.load(f)
def _sanitize_string_for_json(text: str) -> str:
"""Remove characters that cannot be encoded in UTF-8 for JSON serialization.
Uses regex for optimal performance with zero-copy optimization for clean strings.
Fast detection path for clean strings (99% of cases) with efficient removal for dirty strings.
Args:
text: String to sanitize
Returns:
Original string if clean (zero-copy), sanitized string if dirty
"""
if not text:
return text
# Fast path: Check if sanitization is needed using C-level regex search
if not _SURROGATE_PATTERN.search(text):
return text # Zero-copy for clean strings - most common case
# Slow path: Remove problematic characters using C-level regex substitution
return _SURROGATE_PATTERN.sub("", text)
class SanitizingJSONEncoder(json.JSONEncoder):
"""
Custom JSON encoder that sanitizes data during serialization.
This encoder cleans strings during the encoding process without creating
a full copy of the data structure, making it memory-efficient for large datasets.
"""
def encode(self, o):
"""Override encode method to handle simple string cases"""
if isinstance(o, str):
return json.encoder.encode_basestring(_sanitize_string_for_json(o))
return super().encode(o)
def iterencode(self, o, _one_shot=False):
"""
Override iterencode to sanitize strings during serialization.
This is the core method that handles complex nested structures.
"""
# Preprocess: sanitize all strings in the object
sanitized = self._sanitize_for_encoding(o)
# Call parent's iterencode with sanitized data
for chunk in super().iterencode(sanitized, _one_shot):
yield chunk
def _sanitize_for_encoding(self, obj):
"""
Recursively sanitize strings in an object.
Creates new objects only when necessary to avoid deep copies.
Args:
obj: Object to sanitize
Returns:
Sanitized object with cleaned strings
"""
if isinstance(obj, str):
return _sanitize_string_for_json(obj)
elif isinstance(obj, dict):
# Create new dict with sanitized keys and values
new_dict = {}
for k, v in obj.items():
clean_k = _sanitize_string_for_json(k) if isinstance(k, str) else k
clean_v = self._sanitize_for_encoding(v)
new_dict[clean_k] = clean_v
return new_dict
elif isinstance(obj, (list, tuple)):
# Sanitize list/tuple elements
cleaned = [self._sanitize_for_encoding(item) for item in obj]
return type(obj)(cleaned) if isinstance(obj, tuple) else cleaned
else:
# Numbers, booleans, None, etc. remain unchanged
return obj
def write_json(json_obj, file_name):
"""
Write JSON data to file with optimized sanitization strategy.
This function uses a two-stage approach:
1. Fast path: Try direct serialization (works for clean data ~99% of time)
2. Slow path: Use custom encoder that sanitizes during serialization
The custom encoder approach avoids creating a deep copy of the data,
making it memory-efficient. When sanitization occurs, the caller should
reload the cleaned data from the file to update shared memory.
Args:
json_obj: Object to serialize (may be a shallow copy from shared memory)
file_name: Output file path
Returns:
bool: True if sanitization was applied (caller should reload data),
False if direct write succeeded (no reload needed)
"""
try:
# Strategy 1: Fast path - try direct serialization
with open(file_name, "w", encoding="utf-8") as f:
json.dump(json_obj, f, indent=2, ensure_ascii=False)
return False # No sanitization needed, no reload required
except (UnicodeEncodeError, UnicodeDecodeError) as e:
logger.debug(f"Direct JSON write failed, using sanitizing encoder: {e}")
# Strategy 2: Use custom encoder (sanitizes during serialization, zero memory copy)
with open(file_name, "w", encoding="utf-8") as f:
json.dump(json_obj, f, indent=2, ensure_ascii=False, cls=SanitizingJSONEncoder)
logger.info(f"JSON sanitization applied during write: {file_name}")
return True # Sanitization applied, reload recommended
json.dump(json_obj, f, indent=2, ensure_ascii=False)
class TokenizerInterface(Protocol):
@ -2024,7 +1796,7 @@ def normalize_extracted_info(name: str, remove_inner_quotes=False) -> str:
- Filter out short numeric-only text (length < 3 and only digits/dots)
- remove_inner_quotes = True
remove Chinese quotes
remove English quotes in and around chinese
remove English queotes in and around chinese
Convert non-breaking spaces to regular spaces
Convert narrow non-breaking spaces after non-digits to regular spaces
@ -2780,52 +2552,6 @@ def apply_source_ids_limit(
return truncated
def compute_incremental_chunk_ids(
existing_full_chunk_ids: list[str],
old_chunk_ids: list[str],
new_chunk_ids: list[str],
) -> list[str]:
"""
Compute incrementally updated chunk IDs based on changes.
This function applies delta changes (additions and removals) to an existing
list of chunk IDs while maintaining order and ensuring deduplication.
Delta additions from new_chunk_ids are placed at the end.
Args:
existing_full_chunk_ids: Complete list of existing chunk IDs from storage
old_chunk_ids: Previous chunk IDs from source_id (chunks being replaced)
new_chunk_ids: New chunk IDs from updated source_id (chunks being added)
Returns:
Updated list of chunk IDs with deduplication
Example:
>>> existing = ['chunk-1', 'chunk-2', 'chunk-3']
>>> old = ['chunk-1', 'chunk-2']
>>> new = ['chunk-2', 'chunk-4']
>>> compute_incremental_chunk_ids(existing, old, new)
['chunk-3', 'chunk-2', 'chunk-4']
"""
# Calculate changes
chunks_to_remove = set(old_chunk_ids) - set(new_chunk_ids)
chunks_to_add = set(new_chunk_ids) - set(old_chunk_ids)
# Apply changes to full chunk_ids
# Step 1: Remove chunks that are no longer needed
updated_chunk_ids = [
cid for cid in existing_full_chunk_ids if cid not in chunks_to_remove
]
# Step 2: Add new chunks (preserving order from new_chunk_ids)
# Note: 'cid not in updated_chunk_ids' check ensures deduplication
for cid in new_chunk_ids:
if cid in chunks_to_add and cid not in updated_chunk_ids:
updated_chunk_ids.append(cid)
return updated_chunk_ids
def subtract_source_ids(
source_ids: Iterable[str],
ids_to_remove: Collection[str],
@ -2858,6 +2584,65 @@ def parse_relation_chunk_key(key: str) -> tuple[str, str]:
return parts[0], parts[1]
def build_file_path(already_file_paths, data_list, target):
"""Build file path string with UTF-8 byte length limit and deduplication
Args:
already_file_paths: List of existing file paths
data_list: List of data items containing file_path
target: Target name for logging warnings
Returns:
str: Combined file paths separated by GRAPH_FIELD_SEP
"""
# set: deduplication
file_paths_set = {fp for fp in already_file_paths if fp}
# string: filter empty value and keep file order in already_file_paths
file_paths = GRAPH_FIELD_SEP.join(fp for fp in already_file_paths if fp)
# Check if initial file_paths already exceeds byte length limit
if len(file_paths.encode("utf-8")) >= DEFAULT_MAX_FILE_PATH_LENGTH:
logger.warning(
f"Initial file_paths already exceeds {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, "
f"current size: {len(file_paths.encode('utf-8'))} bytes"
)
# ignored file_paths
file_paths_ignore = ""
# add file_paths
for dp in data_list:
cur_file_path = dp.get("file_path")
# empty
if not cur_file_path:
continue
# skip duplicate item
if cur_file_path in file_paths_set:
continue
# add
file_paths_set.add(cur_file_path)
# check the UTF-8 byte length
new_addition = GRAPH_FIELD_SEP + cur_file_path if file_paths else cur_file_path
if (
len(file_paths.encode("utf-8")) + len(new_addition.encode("utf-8"))
< DEFAULT_MAX_FILE_PATH_LENGTH - 5
):
# append
file_paths += new_addition
else:
# ignore
file_paths_ignore += GRAPH_FIELD_SEP + cur_file_path
if file_paths_ignore:
logger.warning(
f"File paths exceed {DEFAULT_MAX_FILE_PATH_LENGTH} bytes for {target}, "
f"ignoring file path: {file_paths_ignore}"
)
return file_paths
def generate_track_id(prefix: str = "upload") -> str:
"""Generate a unique tracking ID with timestamp and UUID