From cc1f7118e7ba9f015e0d5a9a4d74b7a75c6b3f3a Mon Sep 17 00:00:00 2001 From: yangdx Date: Tue, 5 Aug 2025 23:20:26 +0800 Subject: [PATCH] Remove deprecated cache_by_modes functionality from all storage --- lightrag/base.py | 19 ------ lightrag/kg/deprecated/tidb_impl.py | 35 ----------- lightrag/kg/json_kv_impl.py | 90 ----------------------------- lightrag/kg/mongo_impl.py | 22 ------- lightrag/kg/postgres_impl.py | 26 +++------ lightrag/kg/redis_impl.py | 60 ------------------- 6 files changed, 9 insertions(+), 243 deletions(-) diff --git a/lightrag/base.py b/lightrag/base.py index 9e88fb12..0e651f7b 100644 --- a/lightrag/base.py +++ b/lightrag/base.py @@ -331,21 +331,6 @@ class BaseKVStorage(StorageNameSpace, ABC): None """ - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: - """Delete specific records from storage by cache mode - - Importance notes for in-memory storage: - 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed - - Args: - modes (list[str]): List of cache modes to be dropped from storage - - Returns: - True: if the cache drop successfully - False: if the cache drop failed, or the cache mode is not supported - """ - @dataclass class BaseGraphStorage(StorageNameSpace, ABC): @@ -761,10 +746,6 @@ class DocStatusStorage(BaseKVStorage, ABC): Dictionary mapping status names to counts """ - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: - """Drop cache is not supported for Doc Status storage""" - return False - class StoragesStatus(str, Enum): """Storages status""" diff --git a/lightrag/kg/deprecated/tidb_impl.py b/lightrag/kg/deprecated/tidb_impl.py index d60bb1f6..0d5dfca3 100644 --- a/lightrag/kg/deprecated/tidb_impl.py +++ b/lightrag/kg/deprecated/tidb_impl.py @@ -347,41 +347,6 @@ class TiDBKVStorage(BaseKVStorage): except Exception as e: logger.error(f"Error deleting records from {self.namespace}: {e}") - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: - """Delete specific records from storage by cache mode - - Args: - modes (list[str]): List of cache modes to be dropped from storage - - Returns: - bool: True if successful, False otherwise - """ - if not modes: - return False - - try: - table_name = namespace_to_table_name(self.namespace) - if not table_name: - return False - - if table_name != "LIGHTRAG_LLM_CACHE": - return False - - # Build MySQL style IN query - modes_list = ", ".join([f"'{mode}'" for mode in modes]) - sql = f""" - DELETE FROM {table_name} - WHERE workspace = :workspace - AND mode IN ({modes_list}) - """ - - logger.info(f"Deleting cache by modes: {modes}") - await self.db.execute(sql, {"workspace": self.db.workspace}) - return True - except Exception as e: - logger.error(f"Error deleting cache by modes {modes}: {e}") - return False - async def drop(self) -> dict[str, str]: """Drop the storage""" try: diff --git a/lightrag/kg/json_kv_impl.py b/lightrag/kg/json_kv_impl.py index 70a265fe..d6d80079 100644 --- a/lightrag/kg/json_kv_impl.py +++ b/lightrag/kg/json_kv_impl.py @@ -195,96 +195,6 @@ class JsonKVStorage(BaseKVStorage): if any_deleted: await set_all_update_flags(self.final_namespace) - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: - """Delete specific records from storage by cache mode - - Importance notes for in-memory storage: - 1. Changes will be persisted to disk during the next index_done_callback - 2. update flags to notify other processes that data persistence is needed - - Args: - modes (list[str]): List of cache modes to be dropped from storage - - Returns: - True: if the cache drop successfully - False: if the cache drop failed - """ - if not modes: - return False - - try: - async with self._storage_lock: - keys_to_delete = [] - modes_set = set(modes) # Convert to set for efficient lookup - - for key in list(self._data.keys()): - # Parse flattened cache key: mode:cache_type:hash - parts = key.split(":", 2) - if len(parts) == 3 and parts[0] in modes_set: - keys_to_delete.append(key) - - # Batch delete - for key in keys_to_delete: - self._data.pop(key, None) - - if keys_to_delete: - await set_all_update_flags(self.final_namespace) - logger.info( - f"Dropped {len(keys_to_delete)} cache entries for modes: {modes}" - ) - - return True - except Exception as e: - logger.error(f"Error dropping cache by modes: {e}") - return False - - # async def drop_cache_by_chunk_ids(self, chunk_ids: list[str] | None = None) -> bool: - # """Delete specific cache records from storage by chunk IDs - - # Importance notes for in-memory storage: - # 1. Changes will be persisted to disk during the next index_done_callback - # 2. update flags to notify other processes that data persistence is needed - - # Args: - # chunk_ids (list[str]): List of chunk IDs to be dropped from storage - - # Returns: - # True: if the cache drop successfully - # False: if the cache drop failed - # """ - # if not chunk_ids: - # return False - - # try: - # async with self._storage_lock: - # # Iterate through all cache modes to find entries with matching chunk_ids - # for mode_key, mode_data in list(self._data.items()): - # if isinstance(mode_data, dict): - # # Check each cached entry in this mode - # for cache_key, cache_entry in list(mode_data.items()): - # if ( - # isinstance(cache_entry, dict) - # and cache_entry.get("chunk_id") in chunk_ids - # ): - # # Remove this cache entry - # del mode_data[cache_key] - # logger.debug( - # f"Removed cache entry {cache_key} for chunk {cache_entry.get('chunk_id')}" - # ) - - # # If the mode is now empty, remove it entirely - # if not mode_data: - # del self._data[mode_key] - - # # Set update flags to notify persistence is needed - # await set_all_update_flags(self.final_namespace) - - # logger.info(f"Cleared cache for {len(chunk_ids)} chunk IDs") - # return True - # except Exception as e: - # logger.error(f"Error clearing cache by chunk IDs: {e}") - # return False - async def drop(self) -> dict[str, str]: """Drop all data from storage and clean up resources This action will persistent the data to disk immediately. diff --git a/lightrag/kg/mongo_impl.py b/lightrag/kg/mongo_impl.py index 9e2847f2..64622127 100644 --- a/lightrag/kg/mongo_impl.py +++ b/lightrag/kg/mongo_impl.py @@ -232,28 +232,6 @@ class MongoKVStorage(BaseKVStorage): except PyMongoError as e: logger.error(f"Error deleting documents from {self.namespace}: {e}") - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: - """Delete specific records from storage by cache mode - - Args: - modes (list[str]): List of cache modes to be dropped from storage - - Returns: - bool: True if successful, False otherwise - """ - if not modes: - return False - - try: - # Build regex pattern to match flattened key format: mode:cache_type:hash - pattern = f"^({'|'.join(modes)}):" - result = await self._data.delete_many({"_id": {"$regex": pattern}}) - logger.info(f"Deleted {result.deleted_count} documents by modes: {modes}") - return True - except Exception as e: - logger.error(f"Error deleting cache by modes {modes}: {e}") - return False - async def drop(self) -> dict[str, str]: """Drop the storage by removing all documents in the collection. diff --git a/lightrag/kg/postgres_impl.py b/lightrag/kg/postgres_impl.py index 3cc0ac9a..804454f7 100644 --- a/lightrag/kg/postgres_impl.py +++ b/lightrag/kg/postgres_impl.py @@ -307,8 +307,10 @@ class PostgreSQLDB: # Remove deprecated mode field if it exists if "mode" in existing_column_names: - logger.info("Removing deprecated mode column from LIGHTRAG_LLM_CACHE table") - + logger.info( + "Removing deprecated mode column from LIGHTRAG_LLM_CACHE table" + ) + # First, drop the primary key constraint that includes mode drop_pk_sql = """ ALTER TABLE LIGHTRAG_LLM_CACHE @@ -316,15 +318,17 @@ class PostgreSQLDB: """ await self.execute(drop_pk_sql) logger.info("Dropped old primary key constraint") - + # Drop the mode column drop_mode_sql = """ ALTER TABLE LIGHTRAG_LLM_CACHE DROP COLUMN mode """ await self.execute(drop_mode_sql) - logger.info("Successfully removed mode column from LIGHTRAG_LLM_CACHE table") - + logger.info( + "Successfully removed mode column from LIGHTRAG_LLM_CACHE table" + ) + # Create new primary key constraint without mode add_pk_sql = """ ALTER TABLE LIGHTRAG_LLM_CACHE @@ -1697,18 +1701,6 @@ class PGKVStorage(BaseKVStorage): except Exception as e: logger.error(f"Error while deleting records from {self.namespace}: {e}") - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: - """Delete specific records from storage by cache mode (deprecated - mode field removed) - - Args: - modes (list[str]): List of cache modes (deprecated, no longer used) - - Returns: - bool: False (method deprecated due to mode field removal) - """ - logger.warning("drop_cache_by_modes is deprecated: mode field has been removed from LLM cache") - return False - async def drop(self) -> dict[str, str]: """Drop the storage""" try: diff --git a/lightrag/kg/redis_impl.py b/lightrag/kg/redis_impl.py index ae18242f..1c8d3c68 100644 --- a/lightrag/kg/redis_impl.py +++ b/lightrag/kg/redis_impl.py @@ -397,66 +397,6 @@ class RedisKVStorage(BaseKVStorage): f"Deleted {deleted_count} of {len(ids)} entries from {self.namespace}" ) - async def drop_cache_by_modes(self, modes: list[str] | None = None) -> bool: - """Delete specific records from storage by cache mode - - Importance notes for Redis storage: - 1. This will immediately delete the specified cache modes from Redis - - Args: - modes (list[str]): List of cache modes to be dropped from storage - - Returns: - True: if the cache drop successfully - False: if the cache drop failed - """ - if not modes: - return False - - try: - async with self._get_redis_connection() as redis: - keys_to_delete = [] - - # Find matching keys for each mode using SCAN - for mode in modes: - # Use correct pattern to match flattened cache key format {namespace}:{mode}:{cache_type}:{hash} - pattern = f"{self.namespace}:{mode}:*" - cursor = 0 - mode_keys = [] - - while True: - cursor, keys = await redis.scan( - cursor, match=pattern, count=1000 - ) - if keys: - mode_keys.extend(keys) - - if cursor == 0: - break - - keys_to_delete.extend(mode_keys) - logger.info( - f"Found {len(mode_keys)} keys for mode '{mode}' with pattern '{pattern}'" - ) - - if keys_to_delete: - # Batch delete - pipe = redis.pipeline() - for key in keys_to_delete: - pipe.delete(key) - results = await pipe.execute() - deleted_count = sum(results) - logger.info( - f"Dropped {deleted_count} cache entries for modes: {modes}" - ) - else: - logger.warning(f"No cache entries found for modes: {modes}") - - return True - except Exception as e: - logger.error(f"Error dropping cache by modes in Redis: {e}") - return False - async def drop(self) -> dict[str, str]: """Drop the storage by removing all keys under the current namespace.