Remove deprecated cache_by_modes functionality from all storage
This commit is contained in:
parent
8294d6d1b7
commit
cc1f7118e7
6 changed files with 9 additions and 243 deletions
|
|
@ -331,21 +331,6 @@ class BaseKVStorage(StorageNameSpace, ABC):
|
|||
None
|
||||
"""
|
||||
|
||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||
"""Delete specific records from storage by cache mode
|
||||
|
||||
Importance notes for in-memory storage:
|
||||
1. Changes will be persisted to disk during the next index_done_callback
|
||||
2. update flags to notify other processes that data persistence is needed
|
||||
|
||||
Args:
|
||||
modes (list[str]): List of cache modes to be dropped from storage
|
||||
|
||||
Returns:
|
||||
True: if the cache drop successfully
|
||||
False: if the cache drop failed, or the cache mode is not supported
|
||||
"""
|
||||
|
||||
|
||||
@dataclass
|
||||
class BaseGraphStorage(StorageNameSpace, ABC):
|
||||
|
|
@ -761,10 +746,6 @@ class DocStatusStorage(BaseKVStorage, ABC):
|
|||
Dictionary mapping status names to counts
|
||||
"""
|
||||
|
||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||
"""Drop cache is not supported for Doc Status storage"""
|
||||
return False
|
||||
|
||||
|
||||
class StoragesStatus(str, Enum):
|
||||
"""Storages status"""
|
||||
|
|
|
|||
|
|
@ -347,41 +347,6 @@ class TiDBKVStorage(BaseKVStorage):
|
|||
except Exception as e:
|
||||
logger.error(f"Error deleting records from {self.namespace}: {e}")
|
||||
|
||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||
"""Delete specific records from storage by cache mode
|
||||
|
||||
Args:
|
||||
modes (list[str]): List of cache modes to be dropped from storage
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
if not modes:
|
||||
return False
|
||||
|
||||
try:
|
||||
table_name = namespace_to_table_name(self.namespace)
|
||||
if not table_name:
|
||||
return False
|
||||
|
||||
if table_name != "LIGHTRAG_LLM_CACHE":
|
||||
return False
|
||||
|
||||
# Build MySQL style IN query
|
||||
modes_list = ", ".join([f"'{mode}'" for mode in modes])
|
||||
sql = f"""
|
||||
DELETE FROM {table_name}
|
||||
WHERE workspace = :workspace
|
||||
AND mode IN ({modes_list})
|
||||
"""
|
||||
|
||||
logger.info(f"Deleting cache by modes: {modes}")
|
||||
await self.db.execute(sql, {"workspace": self.db.workspace})
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting cache by modes {modes}: {e}")
|
||||
return False
|
||||
|
||||
async def drop(self) -> dict[str, str]:
|
||||
"""Drop the storage"""
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -195,96 +195,6 @@ class JsonKVStorage(BaseKVStorage):
|
|||
if any_deleted:
|
||||
await set_all_update_flags(self.final_namespace)
|
||||
|
||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||
"""Delete specific records from storage by cache mode
|
||||
|
||||
Importance notes for in-memory storage:
|
||||
1. Changes will be persisted to disk during the next index_done_callback
|
||||
2. update flags to notify other processes that data persistence is needed
|
||||
|
||||
Args:
|
||||
modes (list[str]): List of cache modes to be dropped from storage
|
||||
|
||||
Returns:
|
||||
True: if the cache drop successfully
|
||||
False: if the cache drop failed
|
||||
"""
|
||||
if not modes:
|
||||
return False
|
||||
|
||||
try:
|
||||
async with self._storage_lock:
|
||||
keys_to_delete = []
|
||||
modes_set = set(modes) # Convert to set for efficient lookup
|
||||
|
||||
for key in list(self._data.keys()):
|
||||
# Parse flattened cache key: mode:cache_type:hash
|
||||
parts = key.split(":", 2)
|
||||
if len(parts) == 3 and parts[0] in modes_set:
|
||||
keys_to_delete.append(key)
|
||||
|
||||
# Batch delete
|
||||
for key in keys_to_delete:
|
||||
self._data.pop(key, None)
|
||||
|
||||
if keys_to_delete:
|
||||
await set_all_update_flags(self.final_namespace)
|
||||
logger.info(
|
||||
f"Dropped {len(keys_to_delete)} cache entries for modes: {modes}"
|
||||
)
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error dropping cache by modes: {e}")
|
||||
return False
|
||||
|
||||
# async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool:
|
||||
# """Delete specific cache records from storage by chunk IDs
|
||||
|
||||
# Importance notes for in-memory storage:
|
||||
# 1. Changes will be persisted to disk during the next index_done_callback
|
||||
# 2. update flags to notify other processes that data persistence is needed
|
||||
|
||||
# Args:
|
||||
# chunk_ids (list[str]): List of chunk IDs to be dropped from storage
|
||||
|
||||
# Returns:
|
||||
# True: if the cache drop successfully
|
||||
# False: if the cache drop failed
|
||||
# """
|
||||
# if not chunk_ids:
|
||||
# return False
|
||||
|
||||
# try:
|
||||
# async with self._storage_lock:
|
||||
# # Iterate through all cache modes to find entries with matching chunk_ids
|
||||
# for mode_key, mode_data in list(self._data.items()):
|
||||
# if isinstance(mode_data, dict):
|
||||
# # Check each cached entry in this mode
|
||||
# for cache_key, cache_entry in list(mode_data.items()):
|
||||
# if (
|
||||
# isinstance(cache_entry, dict)
|
||||
# and cache_entry.get("chunk_id") in chunk_ids
|
||||
# ):
|
||||
# # Remove this cache entry
|
||||
# del mode_data[cache_key]
|
||||
# logger.debug(
|
||||
# f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}"
|
||||
# )
|
||||
|
||||
# # If the mode is now empty, remove it entirely
|
||||
# if not mode_data:
|
||||
# del self._data[mode_key]
|
||||
|
||||
# # Set update flags to notify persistence is needed
|
||||
# await set_all_update_flags(self.final_namespace)
|
||||
|
||||
# logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs")
|
||||
# return True
|
||||
# except Exception as e:
|
||||
# logger.error(f"Error clearing cache by chunk IDs: {e}")
|
||||
# return False
|
||||
|
||||
async def drop(self) -> dict[str, str]:
|
||||
"""Drop all data from storage and clean up resources
|
||||
This action will persistent the data to disk immediately.
|
||||
|
|
|
|||
|
|
@ -232,28 +232,6 @@ class MongoKVStorage(BaseKVStorage):
|
|||
except PyMongoError as e:
|
||||
logger.error(f"Error deleting documents from {self.namespace}: {e}")
|
||||
|
||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||
"""Delete specific records from storage by cache mode
|
||||
|
||||
Args:
|
||||
modes (list[str]): List of cache modes to be dropped from storage
|
||||
|
||||
Returns:
|
||||
bool: True if successful, False otherwise
|
||||
"""
|
||||
if not modes:
|
||||
return False
|
||||
|
||||
try:
|
||||
# Build regex pattern to match flattened key format: mode:cache_type:hash
|
||||
pattern = f"^({'|'.join(modes)}):"
|
||||
result = await self._data.delete_many({"_id": {"$regex": pattern}})
|
||||
logger.info(f"Deleted {result.deleted_count} documents by modes: {modes}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error deleting cache by modes {modes}: {e}")
|
||||
return False
|
||||
|
||||
async def drop(self) -> dict[str, str]:
|
||||
"""Drop the storage by removing all documents in the collection.
|
||||
|
||||
|
|
|
|||
|
|
@ -307,8 +307,10 @@ class PostgreSQLDB:
|
|||
|
||||
# Remove deprecated mode field if it exists
|
||||
if "mode" in existing_column_names:
|
||||
logger.info("Removing deprecated mode column from LIGHTRAG_LLM_CACHE table")
|
||||
|
||||
logger.info(
|
||||
"Removing deprecated mode column from LIGHTRAG_LLM_CACHE table"
|
||||
)
|
||||
|
||||
# First, drop the primary key constraint that includes mode
|
||||
drop_pk_sql = """
|
||||
ALTER TABLE LIGHTRAG_LLM_CACHE
|
||||
|
|
@ -316,15 +318,17 @@ class PostgreSQLDB:
|
|||
"""
|
||||
await self.execute(drop_pk_sql)
|
||||
logger.info("Dropped old primary key constraint")
|
||||
|
||||
|
||||
# Drop the mode column
|
||||
drop_mode_sql = """
|
||||
ALTER TABLE LIGHTRAG_LLM_CACHE
|
||||
DROP COLUMN mode
|
||||
"""
|
||||
await self.execute(drop_mode_sql)
|
||||
logger.info("Successfully removed mode column from LIGHTRAG_LLM_CACHE table")
|
||||
|
||||
logger.info(
|
||||
"Successfully removed mode column from LIGHTRAG_LLM_CACHE table"
|
||||
)
|
||||
|
||||
# Create new primary key constraint without mode
|
||||
add_pk_sql = """
|
||||
ALTER TABLE LIGHTRAG_LLM_CACHE
|
||||
|
|
@ -1697,18 +1701,6 @@ class PGKVStorage(BaseKVStorage):
|
|||
except Exception as e:
|
||||
logger.error(f"Error while deleting records from {self.namespace}: {e}")
|
||||
|
||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||
"""Delete specific records from storage by cache mode (deprecated - mode field removed)
|
||||
|
||||
Args:
|
||||
modes (list[str]): List of cache modes (deprecated, no longer used)
|
||||
|
||||
Returns:
|
||||
bool: False (method deprecated due to mode field removal)
|
||||
"""
|
||||
logger.warning("drop_cache_by_modes is deprecated: mode field has been removed from LLM cache")
|
||||
return False
|
||||
|
||||
async def drop(self) -> dict[str, str]:
|
||||
"""Drop the storage"""
|
||||
try:
|
||||
|
|
|
|||
|
|
@ -397,66 +397,6 @@ class RedisKVStorage(BaseKVStorage):
|
|||
f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}"
|
||||
)
|
||||
|
||||
async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool:
|
||||
"""Delete specific records from storage by cache mode
|
||||
|
||||
Importance notes for Redis storage:
|
||||
1. This will immediately delete the specified cache modes from Redis
|
||||
|
||||
Args:
|
||||
modes (list[str]): List of cache modes to be dropped from storage
|
||||
|
||||
Returns:
|
||||
True: if the cache drop successfully
|
||||
False: if the cache drop failed
|
||||
"""
|
||||
if not modes:
|
||||
return False
|
||||
|
||||
try:
|
||||
async with self._get_redis_connection() as redis:
|
||||
keys_to_delete = []
|
||||
|
||||
# Find matching keys for each mode using SCAN
|
||||
for mode in modes:
|
||||
# Use correct pattern to match flattened cache key format {namespace}:{mode}:{cache_type}:{hash}
|
||||
pattern = f"{self.namespace}:{mode}:*"
|
||||
cursor = 0
|
||||
mode_keys = []
|
||||
|
||||
while True:
|
||||
cursor, keys = await redis.scan(
|
||||
cursor, match=pattern, count=1000
|
||||
)
|
||||
if keys:
|
||||
mode_keys.extend(keys)
|
||||
|
||||
if cursor == 0:
|
||||
break
|
||||
|
||||
keys_to_delete.extend(mode_keys)
|
||||
logger.info(
|
||||
f"Found {len(mode_keys)} keys for mode '{mode}' with pattern '{pattern}'"
|
||||
)
|
||||
|
||||
if keys_to_delete:
|
||||
# Batch delete
|
||||
pipe = redis.pipeline()
|
||||
for key in keys_to_delete:
|
||||
pipe.delete(key)
|
||||
results = await pipe.execute()
|
||||
deleted_count = sum(results)
|
||||
logger.info(
|
||||
f"Dropped {deleted_count} cache entries for modes: {modes}"
|
||||
)
|
||||
else:
|
||||
logger.warning(f"No cache entries found for modes: {modes}")
|
||||
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Error dropping cache by modes in Redis: {e}")
|
||||
return False
|
||||
|
||||
async def drop(self) -> dict[str, str]:
|
||||
"""Drop the storage by removing all keys under the current namespace.
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue