feat: Add preview step to delete command

Signed-off-by: shehab-badawy <shehab.badawy001@gmail.com>
This commit is contained in:
shehab-badawy 2025-09-11 16:06:20 -04:00
parent 726d4d8535
commit de162cb491
2 changed files with 99 additions and 13 deletions

View file

@ -6,6 +6,7 @@ from cognee.cli.reference import SupportsCliCommand
from cognee.cli import DEFAULT_DOCS_URL
import cognee.cli.echo as fmt
from cognee.cli.exceptions import CliCommandException, CliCommandInnerException
from cognee.modules.data.methods.get_deletion_counts import get_deletion_counts
class DeleteCommand(SupportsCliCommand):
@ -41,29 +42,56 @@ Be careful with deletion operations as they are irreversible.
fmt.error("Please specify what to delete: --dataset-name, --user-id, or --all")
return
# Build confirmation message
if args.all:
confirm_msg = "Delete ALL data from cognee?"
operation = "all data"
elif args.dataset_name:
confirm_msg = f"Delete dataset '{args.dataset_name}'?"
operation = f"dataset '{args.dataset_name}'"
elif args.user_id:
confirm_msg = f"Delete all data for user '{args.user_id}'?"
operation = f"data for user '{args.user_id}'"
# Confirm deletion unless forced
# If --force is used, skip the preview and go straight to deletion
if not args.force:
# --- START PREVIEW LOGIC ---
fmt.echo("Gathering data for preview...")
preview_data = asyncio.run(
get_deletion_counts(
dataset_name=args.dataset_name,
user_id=args.user_id,
all_data=args.all,
)
)
if not preview_data or all(value == 0 for value in preview_data.values()):
fmt.success("No data found to delete.")
return
fmt.echo("You are about to delete:")
if "datasets" in preview_data and preview_data["datasets"] > 0:
fmt.echo(f"- {preview_data['datasets']} datasets")
if "data_entries" in preview_data and preview_data["data_entries"] > 0:
fmt.echo(f"- {preview_data['data_entries']} data entries")
if "users" in preview_data and preview_data["users"] > 0:
fmt.echo(
f"- {preview_data['users']} {'users' if preview_data['users'] > 1 else 'user'}"
)
fmt.echo("-" * 20)
fmt.warning("This operation is irreversible!")
if not fmt.confirm(confirm_msg):
if not fmt.confirm("Proceed?"):
fmt.echo("Deletion cancelled.")
return
# --- END PREVIEW LOGIC ---
# Build operation message for success/failure logging
if args.all:
operation = "all data"
elif args.dataset_name:
operation = f"dataset '{args.dataset_name}'"
elif args.user_id:
operation = f"data for user '{args.user_id}'"
else:
operation = "data"
fmt.echo(f"Deleting {operation}...")
# Run the async delete function
async def run_delete():
try:
# NOTE: The underlying cognee.delete() function is currently not working as expected.
# This is a separate bug that this preview feature helps to expose.
if args.all:
await cognee.delete(dataset_name=None, user_id=args.user_id)
else:
@ -72,6 +100,7 @@ Be careful with deletion operations as they are irreversible.
raise CliCommandInnerException(f"Failed to delete: {str(e)}")
asyncio.run(run_delete())
# This success message may be inaccurate due to the underlying bug, but we leave it for now.
fmt.success(f"Successfully deleted {operation}")
except Exception as e:

View file

@ -0,0 +1,57 @@
from sqlalchemy import select
from sqlalchemy.sql import func
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.data.models import Dataset, Data, DatasetData
from cognee.modules.users.models import User
async def get_deletion_counts(
dataset_name: str = None, user_id: str = None, all_data: bool = False
) -> dict:
"""
Calculates the number of items that will be deleted based on the provided arguments.
"""
relational_engine = get_relational_engine()
async with relational_engine.get_async_session() as session:
if dataset_name:
# Find the dataset by name
dataset_result = await session.execute(
select(Dataset).where(Dataset.name == dataset_name)
)
dataset = dataset_result.scalar_one_or_none()
if dataset is None:
return {"datasets": 0, "data_entries": 0}
# Count data entries linked to this dataset
count_query = (
select(func.count())
.select_from(DatasetData)
.where(DatasetData.dataset_id == dataset.id)
)
data_entry_count = (await session.execute(count_query)).scalar_one()
return {"datasets": 1, "data_entries": data_entry_count}
if all_data:
dataset_count = (
await session.execute(select(func.count()).select_from(Dataset))
).scalar_one()
data_entry_count = (
await session.execute(select(func.count()).select_from(Data))
).scalar_one()
user_count = (
await session.execute(select(func.count()).select_from(User))
).scalar_one()
return {
"datasets": dataset_count,
"data_entries": data_entry_count,
"users": user_count,
}
# Placeholder for user_id logic
if user_id:
# TODO: Implement counting logic for a specific user
return {"datasets": 0, "data_entries": 0, "users": 1}
return {}