From 1485cb82e90f03d817bf97e3e3418d7112f33639 Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 9 Nov 2025 13:37:33 +0800 Subject: [PATCH 1/2] Add LLM query cache cleanup tool for KV storage backends - Interactive cleanup workflow - Supports all KV storage types - Batch deletion with progress - Comprehensive error reporting - Preserves workspace isolation --- .../tools/README_CLEAN_LLM_QUERY_CACHE.md | 661 ++++++++++ lightrag/tools/clean_llm_query_cache.py | 1094 +++++++++++++++++ 2 files changed, 1755 insertions(+) create mode 100644 lightrag/tools/README_CLEAN_LLM_QUERY_CACHE.md create mode 100644 lightrag/tools/clean_llm_query_cache.py diff --git a/lightrag/tools/README_CLEAN_LLM_QUERY_CACHE.md b/lightrag/tools/README_CLEAN_LLM_QUERY_CACHE.md new file mode 100644 index 00000000..42514579 --- /dev/null +++ b/lightrag/tools/README_CLEAN_LLM_QUERY_CACHE.md @@ -0,0 +1,661 @@ +# LLM Query Cache Cleanup Tool - User Guide + +## Overview + +This tool cleans up LightRAG's LLM query cache from KV storage implementations. It specifically targets query caches generated during RAG query operations (modes: `mix`, `hybrid`, `local`, `global`), including both query and keywords caches. + +## Supported Storage Types + +1. **JsonKVStorage** - File-based JSON storage +2. **RedisKVStorage** - Redis database storage +3. **PGKVStorage** - PostgreSQL database storage +4. **MongoKVStorage** - MongoDB database storage + +## Cache Types + +The tool cleans up the following query cache types: + +### Query Cache Modes (4 types) +- `mix:*` - Mixed mode query caches +- `hybrid:*` - Hybrid mode query caches +- `local:*` - Local mode query caches +- `global:*` - Global mode query caches + +### Cache Content Types (2 types) +- `*:query:*` - Query result caches +- `*:keywords:*` - Keywords extraction caches + +### Cache Key Format +``` +:: +``` + +Examples: +- `mix:query:5ce04d25e957c290216cee5bfe6344fa` +- `mix:keywords:fee77b98244a0b047ce95e21060de60e` +- `global:query:abc123def456...` +- `local:keywords:789xyz...` + +**Important Note**: This tool does NOT clean extraction caches (`default:extract:*` and `default:summary:*`). Use the migration tool or manual deletion for those caches. + +## Prerequisites + +- The tool reads storage configuration from environment variables or `config.ini` +- Ensure the target storage is properly configured and accessible +- Backup important data before running cleanup operations + +## Usage + +### Basic Usage + +Run from the LightRAG project root directory: + +```bash +python -m lightrag.tools.clean_llm_query_cache +# or +python lightrag/tools/clean_llm_query_cache.py +``` + +### Interactive Workflow + +The tool guides you through the following steps: + +#### 1. Select Storage Type +``` +============================================================ +LLM Query Cache Cleanup Tool - LightRAG +============================================================ + +=== Storage Setup === + +Supported KV Storage Types: +[1] JsonKVStorage +[2] RedisKVStorage +[3] PGKVStorage +[4] MongoKVStorage + +Select storage type (1-4) (Press Enter to exit): 1 +``` + +**Note**: You can press Enter or type `0` at any prompt to exit gracefully. + +#### 2. Storage Validation +The tool will: +- Check required environment variables +- Auto-detect workspace configuration +- Initialize and connect to storage +- Verify connection status + +``` +Checking configuration... +āœ“ All required environment variables are set + +Initializing storage... +- Storage Type: JsonKVStorage +- Workspace: space1 +- Connection Status: āœ“ Success +``` + +#### 3. View Cache Statistics + +The tool displays a detailed breakdown of query caches by mode and type: + +``` +Counting query cache records... + +šŸ“Š Query Cache Statistics (Before Cleanup): +ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” +│ Mode │ Query │ Keywords │ Total │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ mix │ 1,234 │ 567 │ 1,801 │ +│ hybrid │ 890 │ 423 │ 1,313 │ +│ local │ 2,345 │ 1,123 │ 3,468 │ +│ global │ 678 │ 345 │ 1,023 │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ Total │ 5,147 │ 2,458 │ 7,605 │ +ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ +``` + +#### 4. Select Cleanup Scope + +Choose what type of caches to delete: + +``` +=== Cleanup Options === +[1] Delete all query caches (both query and keywords) +[2] Delete query caches only (keep keywords) +[3] Delete keywords caches only (keep query) +[0] Cancel + +Select cleanup option (0-3): 1 +``` + +**Cleanup Types:** +- **Option 1 (all)**: Deletes both query and keywords caches across all modes +- **Option 2 (query)**: Deletes only query caches, preserves keywords caches +- **Option 3 (keywords)**: Deletes only keywords caches, preserves query caches + +#### 5. Confirm Deletion + +Review the cleanup plan and confirm: + +``` +============================================================ +Cleanup Confirmation +============================================================ +Storage: JsonKVStorage (workspace: space1) +Cleanup Type: all +Records to Delete: 7,605 / 7,605 + +āš ļø WARNING: This will delete ALL query caches across all modes! + +Continue with deletion? (y/n): y +``` + +#### 6. Execute Cleanup + +The tool performs batch deletion with real-time progress: + +**JsonKVStorage Example:** +``` +=== Starting Cleanup === +šŸ’” Processing 1,000 records at a time from JsonKVStorage + +Batch 1/8: ā–ˆā–ˆā–ˆā–ˆā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ 1,000/7,605 (13.1%) āœ“ +Batch 2/8: ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ā–‘ 2,000/7,605 (26.3%) āœ“ +... +Batch 8/8: ā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆā–ˆ 7,605/7,605 (100.0%) āœ“ + +Persisting changes to storage... +āœ“ Changes persisted successfully +``` + +**RedisKVStorage Example:** +``` +=== Starting Cleanup === +šŸ’” Processing Redis keys in batches of 1,000 + +Batch 1: Deleted 1,000 keys (Total: 1,000) āœ“ +Batch 2: Deleted 1,000 keys (Total: 2,000) āœ“ +... +``` + +**PostgreSQL Example:** +``` +=== Starting Cleanup === +šŸ’” Executing PostgreSQL DELETE query + +āœ“ Deleted 7,605 records in 0.45s +``` + +**MongoDB Example:** +``` +=== Starting Cleanup === +šŸ’” Executing MongoDB deleteMany operations + +Pattern 1/8: Deleted 1,234 records āœ“ +Pattern 2/8: Deleted 567 records āœ“ +... +Total deleted: 7,605 records +``` + +#### 7. Review Cleanup Report + +The tool provides a comprehensive final report: + +**Successful Cleanup:** +``` +============================================================ +Cleanup Complete - Final Report +============================================================ + +šŸ“Š Statistics: + Total records to delete: 7,605 + Total batches: 8 + Successful batches: 8 + Failed batches: 0 + Successfully deleted: 7,605 + Failed to delete: 0 + Success rate: 100.00% + +šŸ“ˆ Before/After Comparison: + Total caches before: 7,605 + Total caches after: 0 + Net reduction: 7,605 + +============================================================ +āœ“ SUCCESS: All records cleaned up successfully! +============================================================ + +šŸ“Š Query Cache Statistics (After Cleanup): +ā”Œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¬ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā” +│ Mode │ Query │ Keywords │ Total │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ mix │ 0 │ 0 │ 0 │ +│ hybrid │ 0 │ 0 │ 0 │ +│ local │ 0 │ 0 │ 0 │ +│ global │ 0 │ 0 │ 0 │ +ā”œā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¼ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”¤ +│ Total │ 0 │ 0 │ 0 │ +ā””ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”“ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”€ā”˜ +``` + +**Cleanup with Errors:** +``` +============================================================ +Cleanup Complete - Final Report +============================================================ + +šŸ“Š Statistics: + Total records to delete: 7,605 + Total batches: 8 + Successful batches: 7 + Failed batches: 1 + Successfully deleted: 6,605 + Failed to delete: 1,000 + Success rate: 86.85% + +šŸ“ˆ Before/After Comparison: + Total caches before: 7,605 + Total caches after: 1,000 + Net reduction: 6,605 + +āš ļø Errors encountered: 1 + +Error Details: +------------------------------------------------------------ + +Error Summary: + - ConnectionError: 1 occurrence(s) + +First 5 errors: + + 1. Batch 3 + Type: ConnectionError + Message: Connection timeout after 30s + Records lost: 1,000 + +============================================================ +āš ļø WARNING: Cleanup completed with errors! + Please review the error details above. +============================================================ +``` + +## Technical Details + +### Workspace Handling + +The tool retrieves workspace in the following priority order: + +1. **Storage-specific workspace environment variables** + - PGKVStorage: `POSTGRES_WORKSPACE` + - MongoKVStorage: `MONGODB_WORKSPACE` + - RedisKVStorage: `REDIS_WORKSPACE` + +2. **Generic workspace environment variable** + - `WORKSPACE` + +3. **Default value** + - Empty string (uses storage's default workspace) + +### Batch Deletion + +- Default batch size: 1000 records/batch +- Prevents memory overflow and connection timeouts +- Each batch is processed independently +- Failed batches are logged but don't stop cleanup + +### Storage-Specific Deletion Strategies + +#### JsonKVStorage +- Collects all matching keys first (snapshot approach) +- Deletes in batches with lock protection +- Fast in-memory operations + +#### RedisKVStorage +- Uses SCAN with pattern matching +- Pipeline DELETE for batch operations +- Cursor-based iteration for large datasets + +#### PostgreSQL +- Single DELETE query with OR conditions +- Efficient server-side bulk deletion +- Uses LIKE patterns for mode/type matching + +#### MongoDB +- Multiple deleteMany operations (one per pattern) +- Regex-based document matching +- Returns exact deletion counts + +### Pattern Matching Implementation + +**JsonKVStorage:** +```python +# Direct key prefix matching +if key.startswith("mix:query:") or key.startswith("mix:keywords:") +``` + +**RedisKVStorage:** +```python +# SCAN with namespace-prefixed patterns +pattern = f"{namespace}:mix:query:*" +cursor, keys = await redis.scan(cursor, match=pattern) +``` + +**PostgreSQL:** +```python +# SQL LIKE conditions +WHERE id LIKE 'mix:query:%' OR id LIKE 'mix:keywords:%' +``` + +**MongoDB:** +```python +# Regex queries on _id field +{"_id": {"$regex": "^mix:query:"}} +``` + +## Error Handling & Resilience + +The tool implements comprehensive error tracking: + +### Batch-Level Error Tracking +- Each batch is independently error-checked +- Failed batches are logged with full details +- Successful batches commit even if later batches fail +- Real-time progress shows āœ“ (success) or āœ— (failed) + +### Error Reporting +After cleanup completes, a detailed report includes: +- **Statistics**: Total records, success/failure counts, success rate +- **Before/After Comparison**: Net reduction in cache count +- **Error Summary**: Grouped by error type with occurrence counts +- **Error Details**: Batch number, error type, message, and records lost +- **Recommendations**: Clear indication of success or need for review + +### Verification +- Post-cleanup count verification +- Before/after statistics comparison +- Identifies partial cleanup scenarios + +## Important Notes + +1. **Irreversible Operation** + - Deleted caches cannot be recovered + - Always backup important data before cleanup + - Test on non-production data first + +2. **Performance Impact** + - Query performance may degrade temporarily after cleanup + - Caches will rebuild on subsequent queries + - Consider cleanup during off-peak hours + +3. **Selective Cleanup** + - Choose cleanup scope carefully + - Keywords caches may be valuable for future queries + - Query caches rebuild faster than keywords caches + +4. **Workspace Isolation** + - Cleanup only affects the selected workspace + - Other workspaces remain untouched + - Verify workspace before confirming cleanup + +5. **Interrupt and Resume** + - Cleanup can be interrupted at any time (Ctrl+C) + - Already deleted records cannot be recovered + - No automatic resume - must run tool again + +## Storage Configuration + +The tool supports multiple configuration methods with the following priority: + +1. **Environment variables** (highest priority) +2. **config.ini file** (medium priority) +3. **Default values** (lowest priority) + +### Environment Variable Configuration + +Configure storage settings in your `.env` file: + +#### Workspace Configuration (Optional) + +```bash +# Generic workspace (shared by all storages) +WORKSPACE=space1 + +# Or configure independent workspace for specific storage +POSTGRES_WORKSPACE=pg_space +MONGODB_WORKSPACE=mongo_space +REDIS_WORKSPACE=redis_space +``` + +**Workspace Priority**: Storage-specific > Generic WORKSPACE > Empty string + +#### JsonKVStorage + +```bash +WORKING_DIR=./rag_storage +``` + +#### RedisKVStorage + +```bash +REDIS_URI=redis://localhost:6379 +``` + +#### PGKVStorage + +```bash +POSTGRES_HOST=localhost +POSTGRES_PORT=5432 +POSTGRES_USER=your_username +POSTGRES_PASSWORD=your_password +POSTGRES_DATABASE=your_database +``` + +#### MongoKVStorage + +```bash +MONGO_URI=mongodb://root:root@localhost:27017/ +MONGO_DATABASE=LightRAG +``` + +### config.ini Configuration + +Alternatively, create a `config.ini` file in the project root: + +```ini +[redis] +uri = redis://localhost:6379 + +[postgres] +host = localhost +port = 5432 +user = postgres +password = yourpassword +database = lightrag + +[mongodb] +uri = mongodb://root:root@localhost:27017/ +database = LightRAG +``` + +**Note**: Environment variables take precedence over config.ini settings. + +## Troubleshooting + +### Missing Environment Variables +``` +āš ļø Warning: Missing environment variables: POSTGRES_USER, POSTGRES_PASSWORD +``` +**Solution**: Add missing variables to your `.env` file or configure in `config.ini` + +### Connection Failed +``` +āœ— Initialization failed: Connection refused +``` +**Solutions**: +- Check if database service is running +- Verify connection parameters (host, port, credentials) +- Check firewall settings +- Ensure network connectivity for remote databases + +### No Caches Found +``` +āš ļø No query caches found in storage +``` +**Possible Reasons**: +- No queries have been run yet +- Caches were already cleaned +- Wrong workspace selected +- Different storage type was used for queries + +### Partial Cleanup +``` +āš ļø WARNING: Cleanup completed with errors! +``` +**Solutions**: +- Check error details in the report +- Verify storage connection stability +- Re-run tool to clean remaining caches +- Check storage capacity and permissions + +## Use Cases + +### Use Case 1: Clean All Query Caches + +**Scenario**: Free up storage space by removing all query caches + +```bash +# Run tool +python -m lightrag.tools.clean_llm_query_cache + +# Select: Storage type -> Option 1 (all) -> Confirm (y) +``` + +**Result**: All query and keywords caches deleted, maximum storage freed + +### Use Case 2: Refresh Query Caches Only + +**Scenario**: Force query cache rebuild while keeping keywords + +```bash +# Run tool +python -m lightrag.tools.clean_llm_query_cache + +# Select: Storage type -> Option 2 (query only) -> Confirm (y) +``` + +**Result**: Query caches deleted, keywords preserved for faster rebuild + +### Use Case 3: Clean Stale Keywords + +**Scenario**: Remove outdated keywords while keeping recent query results + +```bash +# Run tool +python -m lightrag.tools.clean_llm_query_cache + +# Select: Storage type -> Option 3 (keywords only) -> Confirm (y) +``` + +**Result**: Keywords deleted, query caches preserved + +### Use Case 4: Workspace-Specific Cleanup + +**Scenario**: Clean caches for a specific workspace + +```bash +# Configure workspace +export WORKSPACE=development + +# Run tool +python -m lightrag.tools.clean_llm_query_cache + +# Select: Storage type -> Cleanup option -> Confirm (y) +``` + +**Result**: Only development workspace caches cleaned + +## Best Practices + +1. **Backup Before Cleanup** + - Always backup your storage before major cleanup + - Test cleanup on non-production data first + - Document cleanup decisions + +2. **Monitor Performance** + - Watch storage metrics during cleanup + - Monitor query performance after cleanup + - Allow time for cache rebuild + +3. **Scheduled Cleanup** + - Clean caches periodically (weekly/monthly) + - Automate cleanup for development environments + - Keep production cleanup manual for safety + +4. **Selective Deletion** + - Consider cleanup scope based on needs + - Keywords caches are harder to rebuild + - Query caches rebuild automatically + +5. **Storage Capacity** + - Monitor storage usage trends + - Clean caches before reaching capacity limits + - Archive old data if needed + +## Comparison with Migration Tool + +| Feature | Cleanup Tool | Migration Tool | +|---------|-------------|----------------| +| **Purpose** | Delete query caches | Migrate extraction caches | +| **Cache Types** | mix/hybrid/local/global | default:extract/summary | +| **Modes** | query, keywords | extract, summary | +| **Operation** | Deletion | Copy between storages | +| **Reversible** | No | Yes (source unchanged) | +| **Use Case** | Free storage, refresh caches | Change storage backend | + +## Limitations + +1. **Single Storage Operation** + - Can only clean one storage type at a time + - To clean multiple storages, run tool multiple times + +2. **No Dry Run Mode** + - Deletion is immediate after confirmation + - No preview-only mode available + - Test on non-production first + +3. **No Selective Mode Cleanup** + - Cannot clean only specific modes (e.g., only `mix`) + - Cleanup applies to all modes for selected cache type + - All-or-nothing per cache type + +4. **No Scheduled Cleanup** + - Manual execution required + - No built-in scheduling + - Use cron/scheduler if automation needed + +5. **Verification Limitations** + - Post-cleanup verification may fail in error scenarios + - Manual verification recommended for critical operations + +## Future Enhancements + +Potential improvements for future versions: + +- Selective mode cleanup (e.g., clean only `mix` mode) +- Age-based cleanup (delete caches older than X days) +- Size-based cleanup (delete largest caches first) +- Dry run mode for safe preview +- Automated scheduling support +- Cache statistics export +- Incremental cleanup with pause/resume + +## Support + +For issues, questions, or feature requests: +- Check the error details in the cleanup report +- Review storage configuration +- Verify workspace settings +- Test with a small dataset first +- Report bugs through project issue tracker diff --git a/lightrag/tools/clean_llm_query_cache.py b/lightrag/tools/clean_llm_query_cache.py new file mode 100644 index 00000000..caa1ad06 --- /dev/null +++ b/lightrag/tools/clean_llm_query_cache.py @@ -0,0 +1,1094 @@ +#!/usr/bin/env python3 +""" +LLM Query Cache Cleanup Tool for LightRAG + +This tool cleans up LLM query cache (mix:*, hybrid:*, local:*, global:*) +from KV storage implementations while preserving workspace isolation. + +Usage: + python -m lightrag.tools.clean_llm_query_cache + # or + python lightrag/tools/clean_llm_query_cache.py + +Supported KV Storage Types: + - JsonKVStorage + - RedisKVStorage + - PGKVStorage + - MongoKVStorage +""" + +import asyncio +import os +import sys +import time +from typing import Any, Dict, List +from dataclasses import dataclass, field +from dotenv import load_dotenv + +# Add project root to path for imports +sys.path.insert( + 0, os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) +) + +from lightrag.kg import STORAGE_ENV_REQUIREMENTS +from lightrag.kg.shared_storage import set_all_update_flags +from lightrag.namespace import NameSpace +from lightrag.utils import setup_logger + +# Load environment variables +load_dotenv(dotenv_path=".env", override=False) + +# Setup logger +setup_logger("lightrag", level="INFO") + +# Storage type configurations +STORAGE_TYPES = { + "1": "JsonKVStorage", + "2": "RedisKVStorage", + "3": "PGKVStorage", + "4": "MongoKVStorage", +} + +# Workspace environment variable mapping +WORKSPACE_ENV_MAP = { + "PGKVStorage": "POSTGRES_WORKSPACE", + "MongoKVStorage": "MONGODB_WORKSPACE", + "RedisKVStorage": "REDIS_WORKSPACE", +} + +# Query cache modes +QUERY_MODES = ["mix", "hybrid", "local", "global"] + +# Query cache types +CACHE_TYPES = ["query", "keywords"] + +# Default batch size for deletion +DEFAULT_BATCH_SIZE = 1000 + +# ANSI color codes for terminal output +BOLD_CYAN = "\033[1;36m" +BOLD_RED = "\033[1;31m" +BOLD_GREEN = "\033[1;32m" +RESET = "\033[0m" + + +@dataclass +class CleanupStats: + """Cleanup statistics and error tracking""" + + # Count by mode and cache_type before cleanup + counts_before: Dict[str, Dict[str, int]] = field(default_factory=dict) + + # Deletion statistics + total_to_delete: int = 0 + total_batches: int = 0 + successful_batches: int = 0 + failed_batches: int = 0 + successfully_deleted: int = 0 + failed_to_delete: int = 0 + + # Count by mode and cache_type after cleanup + counts_after: Dict[str, Dict[str, int]] = field(default_factory=dict) + + # Error tracking + errors: List[Dict[str, Any]] = field(default_factory=list) + + def add_error(self, batch_idx: int, error: Exception, batch_size: int): + """Record batch error""" + self.errors.append( + { + "batch": batch_idx, + "error_type": type(error).__name__, + "error_msg": str(error), + "records_lost": batch_size, + "timestamp": time.time(), + } + ) + self.failed_batches += 1 + self.failed_to_delete += batch_size + + def initialize_counts(self): + """Initialize count dictionaries""" + for mode in QUERY_MODES: + self.counts_before[mode] = {"query": 0, "keywords": 0} + self.counts_after[mode] = {"query": 0, "keywords": 0} + + +class CleanupTool: + """LLM Query Cache Cleanup Tool""" + + def __init__(self): + self.storage = None + self.workspace = "" + self.batch_size = DEFAULT_BATCH_SIZE + + def get_workspace_for_storage(self, storage_name: str) -> str: + """Get workspace for a specific storage type + + Priority: Storage-specific env var > WORKSPACE env var > empty string + + Args: + storage_name: Storage implementation name + + Returns: + Workspace name + """ + # Check storage-specific workspace + if storage_name in WORKSPACE_ENV_MAP: + specific_workspace = os.getenv(WORKSPACE_ENV_MAP[storage_name]) + if specific_workspace: + return specific_workspace + + # Check generic WORKSPACE + workspace = os.getenv("WORKSPACE", "") + return workspace + + def check_config_ini_for_storage(self, storage_name: str) -> bool: + """Check if config.ini has configuration for the storage type + + Args: + storage_name: Storage implementation name + + Returns: + True if config.ini has the necessary configuration + """ + try: + import configparser + + config = configparser.ConfigParser() + config.read("config.ini", "utf-8") + + if storage_name == "RedisKVStorage": + return config.has_option("redis", "uri") + elif storage_name == "PGKVStorage": + return ( + config.has_option("postgres", "user") + and config.has_option("postgres", "password") + and config.has_option("postgres", "database") + ) + elif storage_name == "MongoKVStorage": + return config.has_option("mongodb", "uri") and config.has_option( + "mongodb", "database" + ) + + return False + except Exception: + return False + + def check_env_vars(self, storage_name: str) -> bool: + """Check environment variables, show warnings if missing but don't fail + + Args: + storage_name: Storage implementation name + + Returns: + Always returns True (warnings only, no hard failure) + """ + required_vars = STORAGE_ENV_REQUIREMENTS.get(storage_name, []) + + if not required_vars: + print("āœ“ No environment variables required") + return True + + missing_vars = [var for var in required_vars if var not in os.environ] + + if missing_vars: + print( + f"āš ļø Warning: Missing environment variables: {', '.join(missing_vars)}" + ) + + # Check if config.ini has configuration + has_config = self.check_config_ini_for_storage(storage_name) + if has_config: + print(" āœ“ Found configuration in config.ini") + else: + print(f" Will attempt to use defaults for {storage_name}") + + return True + + print("āœ“ All required environment variables are set") + return True + + def get_storage_class(self, storage_name: str): + """Dynamically import and return storage class + + Args: + storage_name: Storage implementation name + + Returns: + Storage class + """ + if storage_name == "JsonKVStorage": + from lightrag.kg.json_kv_impl import JsonKVStorage + + return JsonKVStorage + elif storage_name == "RedisKVStorage": + from lightrag.kg.redis_impl import RedisKVStorage + + return RedisKVStorage + elif storage_name == "PGKVStorage": + from lightrag.kg.postgres_impl import PGKVStorage + + return PGKVStorage + elif storage_name == "MongoKVStorage": + from lightrag.kg.mongo_impl import MongoKVStorage + + return MongoKVStorage + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + + async def initialize_storage(self, storage_name: str, workspace: str): + """Initialize storage instance with fallback to config.ini and defaults + + Args: + storage_name: Storage implementation name + workspace: Workspace name + + Returns: + Initialized storage instance + + Raises: + Exception: If initialization fails + """ + storage_class = self.get_storage_class(storage_name) + + # Create global config + global_config = { + "working_dir": os.getenv("WORKING_DIR", "./rag_storage"), + "embedding_batch_num": 10, + } + + # Initialize storage + storage = storage_class( + namespace=NameSpace.KV_STORE_LLM_RESPONSE_CACHE, + workspace=workspace, + global_config=global_config, + embedding_func=None, + ) + + # Initialize the storage (may raise exception if connection fails) + await storage.initialize() + + return storage + + async def count_query_caches_json(self, storage) -> Dict[str, Dict[str, int]]: + """Count query caches in JsonKVStorage by mode and cache_type + + Args: + storage: JsonKVStorage instance + + Returns: + Dictionary with counts for each mode and cache_type + """ + counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES} + + async with storage._storage_lock: + for key in storage._data.keys(): + for mode in QUERY_MODES: + if key.startswith(f"{mode}:query:"): + counts[mode]["query"] += 1 + elif key.startswith(f"{mode}:keywords:"): + counts[mode]["keywords"] += 1 + + return counts + + async def count_query_caches_redis(self, storage) -> Dict[str, Dict[str, int]]: + """Count query caches in RedisKVStorage by mode and cache_type + + Args: + storage: RedisKVStorage instance + + Returns: + Dictionary with counts for each mode and cache_type + """ + counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES} + + print("Scanning Redis keys...", end="", flush=True) + + async with storage._get_redis_connection() as redis: + for mode in QUERY_MODES: + for cache_type in CACHE_TYPES: + pattern = f"{mode}:{cache_type}:*" + prefixed_pattern = f"{storage.final_namespace}:{pattern}" + cursor = 0 + + while True: + cursor, keys = await redis.scan( + cursor, match=prefixed_pattern, count=DEFAULT_BATCH_SIZE + ) + counts[mode][cache_type] += len(keys) + + if cursor == 0: + break + + print() # New line after progress + return counts + + async def count_query_caches_pg(self, storage) -> Dict[str, Dict[str, int]]: + """Count query caches in PostgreSQL by mode and cache_type + + Args: + storage: PGKVStorage instance + + Returns: + Dictionary with counts for each mode and cache_type + """ + from lightrag.kg.postgres_impl import namespace_to_table_name + + counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES} + table_name = namespace_to_table_name(storage.namespace) + + print("Counting PostgreSQL records...", end="", flush=True) + start_time = time.time() + + for mode in QUERY_MODES: + for cache_type in CACHE_TYPES: + query = f""" + SELECT COUNT(*) as count + FROM {table_name} + WHERE workspace = $1 + AND id LIKE $2 + """ + pattern = f"{mode}:{cache_type}:%" + result = await storage.db.query(query, [storage.workspace, pattern]) + counts[mode][cache_type] = result["count"] if result else 0 + + elapsed = time.time() - start_time + if elapsed > 1: + print(f" (took {elapsed:.1f}s)", end="") + print() # New line + + return counts + + async def count_query_caches_mongo(self, storage) -> Dict[str, Dict[str, int]]: + """Count query caches in MongoDB by mode and cache_type + + Args: + storage: MongoKVStorage instance + + Returns: + Dictionary with counts for each mode and cache_type + """ + counts = {mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES} + + print("Counting MongoDB documents...", end="", flush=True) + start_time = time.time() + + for mode in QUERY_MODES: + for cache_type in CACHE_TYPES: + pattern = f"^{mode}:{cache_type}:" + query = {"_id": {"$regex": pattern}} + count = await storage._data.count_documents(query) + counts[mode][cache_type] = count + + elapsed = time.time() - start_time + if elapsed > 1: + print(f" (took {elapsed:.1f}s)", end="") + print() # New line + + return counts + + async def count_query_caches( + self, storage, storage_name: str + ) -> Dict[str, Dict[str, int]]: + """Count query caches from any storage type efficiently + + Args: + storage: Storage instance + storage_name: Storage type name + + Returns: + Dictionary with counts for each mode and cache_type + """ + if storage_name == "JsonKVStorage": + return await self.count_query_caches_json(storage) + elif storage_name == "RedisKVStorage": + return await self.count_query_caches_redis(storage) + elif storage_name == "PGKVStorage": + return await self.count_query_caches_pg(storage) + elif storage_name == "MongoKVStorage": + return await self.count_query_caches_mongo(storage) + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + + async def delete_query_caches_json( + self, storage, cleanup_type: str, stats: CleanupStats + ): + """Delete query caches from JsonKVStorage + + Args: + storage: JsonKVStorage instance + cleanup_type: 'all', 'query', or 'keywords' + stats: CleanupStats object to track progress + """ + # Collect keys to delete + async with storage._storage_lock: + keys_to_delete = [] + for key in storage._data.keys(): + should_delete = False + for mode in QUERY_MODES: + if cleanup_type == "all": + if key.startswith(f"{mode}:query:") or key.startswith( + f"{mode}:keywords:" + ): + should_delete = True + elif cleanup_type == "query": + if key.startswith(f"{mode}:query:"): + should_delete = True + elif cleanup_type == "keywords": + if key.startswith(f"{mode}:keywords:"): + should_delete = True + + if should_delete: + keys_to_delete.append(key) + + # Delete in batches + total_keys = len(keys_to_delete) + stats.total_batches = (total_keys + self.batch_size - 1) // self.batch_size + + print("\n=== Starting Cleanup ===") + print( + f"šŸ’” Processing {self.batch_size:,} records at a time from JsonKVStorage\n" + ) + + for batch_idx in range(stats.total_batches): + start_idx = batch_idx * self.batch_size + end_idx = min((batch_idx + 1) * self.batch_size, total_keys) + batch_keys = keys_to_delete[start_idx:end_idx] + + try: + async with storage._storage_lock: + for key in batch_keys: + del storage._data[key] + + # CRITICAL: Set update flag so changes persist to disk + # Without this, deletions remain in-memory only and are lost on exit + await set_all_update_flags(storage.final_namespace) + + # Success + stats.successful_batches += 1 + stats.successfully_deleted += len(batch_keys) + + # Calculate progress + progress = (stats.successfully_deleted / total_keys) * 100 + bar_length = 20 + filled_length = int( + bar_length * stats.successfully_deleted // total_keys + ) + bar = "ā–ˆ" * filled_length + "ā–‘" * (bar_length - filled_length) + + print( + f"Batch {batch_idx + 1}/{stats.total_batches}: {bar} " + f"{stats.successfully_deleted:,}/{total_keys:,} ({progress:.1f}%) āœ“" + ) + + except Exception as e: + stats.add_error(batch_idx + 1, e, len(batch_keys)) + print( + f"Batch {batch_idx + 1}/{stats.total_batches}: āœ— FAILED - " + f"{type(e).__name__}: {str(e)}" + ) + + async def delete_query_caches_redis( + self, storage, cleanup_type: str, stats: CleanupStats + ): + """Delete query caches from RedisKVStorage + + Args: + storage: RedisKVStorage instance + cleanup_type: 'all', 'query', or 'keywords' + stats: CleanupStats object to track progress + """ + # Build patterns to delete + patterns = [] + for mode in QUERY_MODES: + if cleanup_type == "all": + patterns.append(f"{mode}:query:*") + patterns.append(f"{mode}:keywords:*") + elif cleanup_type == "query": + patterns.append(f"{mode}:query:*") + elif cleanup_type == "keywords": + patterns.append(f"{mode}:keywords:*") + + print("\n=== Starting Cleanup ===") + print(f"šŸ’” Processing Redis keys in batches of {self.batch_size:,}\n") + + batch_idx = 0 + total_deleted = 0 + + async with storage._get_redis_connection() as redis: + for pattern in patterns: + prefixed_pattern = f"{storage.final_namespace}:{pattern}" + cursor = 0 + + while True: + cursor, keys = await redis.scan( + cursor, match=prefixed_pattern, count=self.batch_size + ) + + if keys: + batch_idx += 1 + stats.total_batches += 1 + + try: + # Delete batch using pipeline + pipe = redis.pipeline() + for key in keys: + pipe.delete(key) + await pipe.execute() + + # Success + stats.successful_batches += 1 + stats.successfully_deleted += len(keys) + total_deleted += len(keys) + + # Progress + print( + f"Batch {batch_idx}: Deleted {len(keys):,} keys " + f"(Total: {total_deleted:,}) āœ“" + ) + + except Exception as e: + stats.add_error(batch_idx, e, len(keys)) + print( + f"Batch {batch_idx}: āœ— FAILED - " + f"{type(e).__name__}: {str(e)}" + ) + + if cursor == 0: + break + + await asyncio.sleep(0) + + async def delete_query_caches_pg( + self, storage, cleanup_type: str, stats: CleanupStats + ): + """Delete query caches from PostgreSQL + + Args: + storage: PGKVStorage instance + cleanup_type: 'all', 'query', or 'keywords' + stats: CleanupStats object to track progress + """ + from lightrag.kg.postgres_impl import namespace_to_table_name + + table_name = namespace_to_table_name(storage.namespace) + + # Build WHERE conditions + conditions = [] + for mode in QUERY_MODES: + if cleanup_type == "all": + conditions.append(f"id LIKE '{mode}:query:%'") + conditions.append(f"id LIKE '{mode}:keywords:%'") + elif cleanup_type == "query": + conditions.append(f"id LIKE '{mode}:query:%'") + elif cleanup_type == "keywords": + conditions.append(f"id LIKE '{mode}:keywords:%'") + + where_clause = " OR ".join(conditions) + + print("\n=== Starting Cleanup ===") + print("šŸ’” Executing PostgreSQL DELETE query\n") + + try: + query = f""" + DELETE FROM {table_name} + WHERE workspace = $1 + AND ({where_clause}) + """ + + start_time = time.time() + # Fix: Pass dict instead of list for execute() method + await storage.db.execute(query, {"workspace": storage.workspace}) + elapsed = time.time() - start_time + + # PostgreSQL returns deletion count + stats.total_batches = 1 + stats.successful_batches = 1 + stats.successfully_deleted = stats.total_to_delete + + print(f"āœ“ Deleted {stats.successfully_deleted:,} records in {elapsed:.2f}s") + + except Exception as e: + stats.add_error(1, e, stats.total_to_delete) + print(f"āœ— DELETE failed: {type(e).__name__}: {str(e)}") + + async def delete_query_caches_mongo( + self, storage, cleanup_type: str, stats: CleanupStats + ): + """Delete query caches from MongoDB + + Args: + storage: MongoKVStorage instance + cleanup_type: 'all', 'query', or 'keywords' + stats: CleanupStats object to track progress + """ + # Build regex patterns + patterns = [] + for mode in QUERY_MODES: + if cleanup_type == "all": + patterns.append(f"^{mode}:query:") + patterns.append(f"^{mode}:keywords:") + elif cleanup_type == "query": + patterns.append(f"^{mode}:query:") + elif cleanup_type == "keywords": + patterns.append(f"^{mode}:keywords:") + + print("\n=== Starting Cleanup ===") + print("šŸ’” Executing MongoDB deleteMany operations\n") + + total_deleted = 0 + for idx, pattern in enumerate(patterns, 1): + try: + query = {"_id": {"$regex": pattern}} + result = await storage._data.delete_many(query) + deleted_count = result.deleted_count + + stats.total_batches += 1 + stats.successful_batches += 1 + stats.successfully_deleted += deleted_count + total_deleted += deleted_count + + print( + f"Pattern {idx}/{len(patterns)}: Deleted {deleted_count:,} records āœ“" + ) + + except Exception as e: + stats.add_error(idx, e, 0) + print( + f"Pattern {idx}/{len(patterns)}: āœ— FAILED - " + f"{type(e).__name__}: {str(e)}" + ) + + print(f"\nTotal deleted: {total_deleted:,} records") + + async def delete_query_caches( + self, storage, storage_name: str, cleanup_type: str, stats: CleanupStats + ): + """Delete query caches from any storage type + + Args: + storage: Storage instance + storage_name: Storage type name + cleanup_type: 'all', 'query', or 'keywords' + stats: CleanupStats object to track progress + """ + if storage_name == "JsonKVStorage": + await self.delete_query_caches_json(storage, cleanup_type, stats) + elif storage_name == "RedisKVStorage": + await self.delete_query_caches_redis(storage, cleanup_type, stats) + elif storage_name == "PGKVStorage": + await self.delete_query_caches_pg(storage, cleanup_type, stats) + elif storage_name == "MongoKVStorage": + await self.delete_query_caches_mongo(storage, cleanup_type, stats) + else: + raise ValueError(f"Unsupported storage type: {storage_name}") + + def print_header(self): + """Print tool header""" + print("\n" + "=" * 60) + print("LLM Query Cache Cleanup Tool - LightRAG") + print("=" * 60) + + def print_storage_types(self): + """Print available storage types""" + print("\nSupported KV Storage Types:") + for key, value in STORAGE_TYPES.items(): + print(f"[{key}] {value}") + + def format_workspace(self, workspace: str) -> str: + """Format workspace name with highlighting + + Args: + workspace: Workspace name (may be empty) + + Returns: + Formatted workspace string with ANSI color codes + """ + if workspace: + return f"{BOLD_CYAN}{workspace}{RESET}" + else: + return f"{BOLD_CYAN}(default){RESET}" + + def print_cache_statistics(self, counts: Dict[str, Dict[str, int]], title: str): + """Print cache statistics in a formatted table + + Args: + counts: Dictionary with counts for each mode and cache_type + title: Title for the statistics display + """ + print(f"\n{title}") + print("ā”Œ" + "─" * 12 + "┬" + "─" * 12 + "┬" + "─" * 12 + "┬" + "─" * 12 + "┐") + print(f"│ {'Mode':<10} │ {'Query':<10} │ {'Keywords':<10} │ {'Total':<10} │") + print("ā”œ" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┤") + + total_query = 0 + total_keywords = 0 + + for mode in QUERY_MODES: + query_count = counts[mode]["query"] + keywords_count = counts[mode]["keywords"] + mode_total = query_count + keywords_count + + total_query += query_count + total_keywords += keywords_count + + print( + f"│ {mode:<10} │ {query_count:>10,} │ {keywords_count:>10,} │ {mode_total:>10,} │" + ) + + print("ā”œ" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┤") + grand_total = total_query + total_keywords + print( + f"│ {'Total':<10} │ {total_query:>10,} │ {total_keywords:>10,} │ {grand_total:>10,} │" + ) + print("ā””" + "─" * 12 + "┓" + "─" * 12 + "┓" + "─" * 12 + "┓" + "─" * 12 + "ā”˜") + + def calculate_total_to_delete( + self, counts: Dict[str, Dict[str, int]], cleanup_type: str + ) -> int: + """Calculate total number of records to delete + + Args: + counts: Dictionary with counts for each mode and cache_type + cleanup_type: 'all', 'query', or 'keywords' + + Returns: + Total number of records to delete + """ + total = 0 + for mode in QUERY_MODES: + if cleanup_type == "all": + total += counts[mode]["query"] + counts[mode]["keywords"] + elif cleanup_type == "query": + total += counts[mode]["query"] + elif cleanup_type == "keywords": + total += counts[mode]["keywords"] + return total + + def print_cleanup_report(self, stats: CleanupStats): + """Print comprehensive cleanup report + + Args: + stats: CleanupStats object with cleanup results + """ + print("\n" + "=" * 60) + print("Cleanup Complete - Final Report") + print("=" * 60) + + # Overall statistics + print("\nšŸ“Š Statistics:") + print(f" Total records to delete: {stats.total_to_delete:,}") + print(f" Total batches: {stats.total_batches:,}") + print(f" Successful batches: {stats.successful_batches:,}") + print(f" Failed batches: {stats.failed_batches:,}") + print(f" Successfully deleted: {stats.successfully_deleted:,}") + print(f" Failed to delete: {stats.failed_to_delete:,}") + + # Success rate + success_rate = ( + (stats.successfully_deleted / stats.total_to_delete * 100) + if stats.total_to_delete > 0 + else 0 + ) + print(f" Success rate: {success_rate:.2f}%") + + # Before/After comparison + print("\nšŸ“ˆ Before/After Comparison:") + total_before = sum( + counts["query"] + counts["keywords"] + for counts in stats.counts_before.values() + ) + total_after = sum( + counts["query"] + counts["keywords"] + for counts in stats.counts_after.values() + ) + print(f" Total caches before: {total_before:,}") + print(f" Total caches after: {total_after:,}") + print(f" Net reduction: {total_before - total_after:,}") + + # Error details + if stats.errors: + print(f"\nāš ļø Errors encountered: {len(stats.errors)}") + print("\nError Details:") + print("-" * 60) + + # Group errors by type + error_types = {} + for error in stats.errors: + err_type = error["error_type"] + error_types[err_type] = error_types.get(err_type, 0) + 1 + + print("\nError Summary:") + for err_type, count in sorted(error_types.items(), key=lambda x: -x[1]): + print(f" - {err_type}: {count} occurrence(s)") + + print("\nFirst 5 errors:") + for i, error in enumerate(stats.errors[:5], 1): + print(f"\n {i}. Batch {error['batch']}") + print(f" Type: {error['error_type']}") + print(f" Message: {error['error_msg']}") + print(f" Records lost: {error['records_lost']:,}") + + if len(stats.errors) > 5: + print(f"\n ... and {len(stats.errors) - 5} more errors") + + print("\n" + "=" * 60) + print(f"{BOLD_RED}āš ļø WARNING: Cleanup completed with errors!{RESET}") + print(" Please review the error details above.") + print("=" * 60) + else: + print("\n" + "=" * 60) + print(f"{BOLD_GREEN}āœ“ SUCCESS: All records cleaned up successfully!{RESET}") + print("=" * 60) + + async def setup_storage(self) -> tuple: + """Setup and initialize storage + + Returns: + Tuple of (storage_instance, storage_name, workspace) + Returns (None, None, None) if user chooses to exit + """ + print("\n=== Storage Setup ===") + self.print_storage_types() + + # Custom input handling with exit support + while True: + choice = input( + "\nSelect storage type (1-4) (Press Enter to exit): " + ).strip() + + # Check for exit + if choice == "" or choice == "0": + print("\nāœ“ Cleanup cancelled by user") + return None, None, None + + # Check if choice is valid + if choice in STORAGE_TYPES: + break + + print( + f"āœ— Invalid choice. Please enter one of: {', '.join(STORAGE_TYPES.keys())}" + ) + + storage_name = STORAGE_TYPES[choice] + + # Check configuration (warnings only, doesn't block) + print("\nChecking configuration...") + self.check_env_vars(storage_name) + + # Get workspace + workspace = self.get_workspace_for_storage(storage_name) + + # Initialize storage (real validation point) + print("\nInitializing storage...") + try: + storage = await self.initialize_storage(storage_name, workspace) + print(f"- Storage Type: {storage_name}") + print(f"- Workspace: {workspace if workspace else '(default)'}") + print("- Connection Status: āœ“ Success") + + except Exception as e: + print(f"āœ— Initialization failed: {e}") + print(f"\nFor {storage_name}, you can configure using:") + print(" 1. Environment variables (highest priority)") + + # Show specific environment variable requirements + if storage_name in STORAGE_ENV_REQUIREMENTS: + for var in STORAGE_ENV_REQUIREMENTS[storage_name]: + print(f" - {var}") + + print(" 2. config.ini file (medium priority)") + if storage_name == "RedisKVStorage": + print(" [redis]") + print(" uri = redis://localhost:6379") + elif storage_name == "PGKVStorage": + print(" [postgres]") + print(" host = localhost") + print(" port = 5432") + print(" user = postgres") + print(" password = yourpassword") + print(" database = lightrag") + elif storage_name == "MongoKVStorage": + print(" [mongodb]") + print(" uri = mongodb://root:root@localhost:27017/") + print(" database = LightRAG") + + return None, None, None + + return storage, storage_name, workspace + + async def run(self): + """Run the cleanup tool""" + try: + # Initialize shared storage (REQUIRED for storage classes to work) + from lightrag.kg.shared_storage import initialize_share_data + + initialize_share_data(workers=1) + + # Print header + self.print_header() + + # Setup storage + self.storage, storage_name, self.workspace = await self.setup_storage() + + # Check if user cancelled + if self.storage is None: + return + + # Count query caches + print("\nCounting query cache records...") + try: + counts = await self.count_query_caches(self.storage, storage_name) + except Exception as e: + print(f"āœ— Counting failed: {e}") + await self.storage.finalize() + return + + # Initialize stats + stats = CleanupStats() + stats.initialize_counts() + stats.counts_before = counts + + # Print statistics + self.print_cache_statistics( + counts, "šŸ“Š Query Cache Statistics (Before Cleanup):" + ) + + # Calculate total + total_caches = sum( + counts[mode]["query"] + counts[mode]["keywords"] for mode in QUERY_MODES + ) + + if total_caches == 0: + print("\nāš ļø No query caches found in storage") + await self.storage.finalize() + return + + # Select cleanup type + print("\n=== Cleanup Options ===") + print("[1] Delete all query caches (both query and keywords)") + print("[2] Delete query caches only (keep keywords)") + print("[3] Delete keywords caches only (keep query)") + print("[0] Cancel") + + while True: + choice = input("\nSelect cleanup option (0-3): ").strip() + + if choice == "0" or choice == "": + print("\nāœ“ Cleanup cancelled") + await self.storage.finalize() + return + elif choice == "1": + cleanup_type = "all" + break + elif choice == "2": + cleanup_type = "query" + break + elif choice == "3": + cleanup_type = "keywords" + break + else: + print("āœ— Invalid choice. Please enter 0, 1, 2, or 3") + + # Calculate total to delete + stats.total_to_delete = self.calculate_total_to_delete(counts, cleanup_type) + + # Confirm deletion + print("\n" + "=" * 60) + print("Cleanup Confirmation") + print("=" * 60) + print( + f"Storage: {BOLD_CYAN}{storage_name}{RESET} " + f"(workspace: {self.format_workspace(self.workspace)})" + ) + print(f"Cleanup Type: {BOLD_CYAN}{cleanup_type}{RESET}") + print( + f"Records to Delete: {BOLD_RED}{stats.total_to_delete:,}{RESET} / {total_caches:,}" + ) + + if cleanup_type == "all": + print( + f"\n{BOLD_RED}āš ļø WARNING: This will delete ALL query caches across all modes!{RESET}" + ) + elif cleanup_type == "query": + print("\nāš ļø This will delete query caches only (keywords will be kept)") + elif cleanup_type == "keywords": + print("\nāš ļø This will delete keywords caches only (query will be kept)") + + confirm = input("\nContinue with deletion? (y/n): ").strip().lower() + if confirm != "y": + print("\nāœ“ Cleanup cancelled") + await self.storage.finalize() + return + + # Perform deletion + await self.delete_query_caches( + self.storage, storage_name, cleanup_type, stats + ) + + # Persist changes + print("\nPersisting changes to storage...") + try: + await self.storage.index_done_callback() + print("āœ“ Changes persisted successfully") + except Exception as e: + print(f"āœ— Persist failed: {e}") + stats.add_error(0, e, 0) + + # Count again to verify + print("\nVerifying cleanup results...") + try: + stats.counts_after = await self.count_query_caches( + self.storage, storage_name + ) + except Exception as e: + print(f"āš ļø Verification failed: {e}") + # Use zero counts if verification fails + stats.counts_after = { + mode: {"query": 0, "keywords": 0} for mode in QUERY_MODES + } + + # Print final report + self.print_cleanup_report(stats) + + # Print after statistics + self.print_cache_statistics( + stats.counts_after, "\nšŸ“Š Query Cache Statistics (After Cleanup):" + ) + + # Cleanup + await self.storage.finalize() + + except KeyboardInterrupt: + print("\n\nāœ— Cleanup interrupted by user") + except Exception as e: + print(f"\nāœ— Cleanup failed: {e}") + import traceback + + traceback.print_exc() + finally: + # Ensure cleanup + if self.storage: + try: + await self.storage.finalize() + except Exception: + pass + + # Finalize shared storage + try: + from lightrag.kg.shared_storage import finalize_share_data + + finalize_share_data() + except Exception: + pass + + +async def main(): + """Main entry point""" + tool = CleanupTool() + await tool.run() + + +if __name__ == "__main__": + asyncio.run(main()) From 37b711890173d1646193622c1afa04f4083ba43c Mon Sep 17 00:00:00 2001 From: yangdx Date: Sun, 9 Nov 2025 14:17:56 +0800 Subject: [PATCH 2/2] Fix table alignment and add validation for empty cleanup selections --- lightrag/tools/clean_llm_query_cache.py | 30 ++++++++++++++++++++----- 1 file changed, 24 insertions(+), 6 deletions(-) diff --git a/lightrag/tools/clean_llm_query_cache.py b/lightrag/tools/clean_llm_query_cache.py index caa1ad06..1b688c29 100644 --- a/lightrag/tools/clean_llm_query_cache.py +++ b/lightrag/tools/clean_llm_query_cache.py @@ -719,7 +719,7 @@ class CleanupTool: """ print(f"\n{title}") print("ā”Œ" + "─" * 12 + "┬" + "─" * 12 + "┬" + "─" * 12 + "┬" + "─" * 12 + "┐") - print(f"│ {'Mode':<10} │ {'Query':<10} │ {'Keywords':<10} │ {'Total':<10} │") + print(f"│ {'Mode':<10} │ {'Query':>10} │ {'Keywords':>10} │ {'Total':>10} │") print("ā”œ" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┼" + "─" * 12 + "┤") total_query = 0 @@ -981,18 +981,36 @@ class CleanupTool: return elif choice == "1": cleanup_type = "all" - break elif choice == "2": cleanup_type = "query" - break elif choice == "3": cleanup_type = "keywords" - break else: print("āœ— Invalid choice. Please enter 0, 1, 2, or 3") + continue - # Calculate total to delete - stats.total_to_delete = self.calculate_total_to_delete(counts, cleanup_type) + # Calculate total to delete for the selected type + stats.total_to_delete = self.calculate_total_to_delete( + counts, cleanup_type + ) + + # Check if there are any records to delete + if stats.total_to_delete == 0: + if cleanup_type == "all": + print(f"\n{BOLD_RED}āš ļø No query caches found to delete!{RESET}") + elif cleanup_type == "query": + print( + f"\n{BOLD_RED}āš ļø No query caches found to delete! (Only keywords exist){RESET}" + ) + elif cleanup_type == "keywords": + print( + f"\n{BOLD_RED}āš ļø No keywords caches found to delete! (Only query caches exist){RESET}" + ) + print(" Please select a different cleanup option.\n") + continue + + # Valid selection with records to delete + break # Confirm deletion print("\n" + "=" * 60)