feat: Add database deletion on dataset delete (#1893)
<!-- .github/pull_request_template.md -->
## Description
- Add support for database deletion when dataset is deleted
- Simplify dataset handler usage in Cognee
## Type of Change
<!-- Please check the relevant option -->
- [x] Bug fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):
## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->
## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [ ] **I have tested my changes thoroughly before submitting this PR**
- [ ] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages
## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit
* **Bug Fixes**
* Improved dataset deletion: stronger authorization checks and reliable
removal of associated graph and vector storage.
* **Tests**
* Added end-to-end test to verify complete dataset deletion and cleanup
of all related storage components.
<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
This commit is contained in:
parent
6b86f423ff
commit
14d9540d1b
11 changed files with 183 additions and 51 deletions
25
.github/workflows/e2e_tests.yml
vendored
25
.github/workflows/e2e_tests.yml
vendored
|
|
@ -237,6 +237,31 @@ jobs:
|
||||||
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
|
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
|
||||||
run: uv run python ./cognee/tests/test_dataset_database_handler.py
|
run: uv run python ./cognee/tests/test_dataset_database_handler.py
|
||||||
|
|
||||||
|
test-dataset-database-deletion:
|
||||||
|
name: Test dataset database deletion in Cognee
|
||||||
|
runs-on: ubuntu-22.04
|
||||||
|
steps:
|
||||||
|
- name: Check out repository
|
||||||
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Cognee Setup
|
||||||
|
uses: ./.github/actions/cognee_setup
|
||||||
|
with:
|
||||||
|
python-version: '3.11.x'
|
||||||
|
|
||||||
|
- name: Run dataset databases deletion test
|
||||||
|
env:
|
||||||
|
ENV: 'dev'
|
||||||
|
LLM_MODEL: ${{ secrets.LLM_MODEL }}
|
||||||
|
LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }}
|
||||||
|
LLM_API_KEY: ${{ secrets.LLM_API_KEY }}
|
||||||
|
LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }}
|
||||||
|
EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }}
|
||||||
|
EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }}
|
||||||
|
EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }}
|
||||||
|
EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }}
|
||||||
|
run: uv run python ./cognee/tests/test_dataset_delete.py
|
||||||
|
|
||||||
test-permissions:
|
test-permissions:
|
||||||
name: Test permissions with different situations in Cognee
|
name: Test permissions with different situations in Cognee
|
||||||
runs-on: ubuntu-22.04
|
runs-on: ubuntu-22.04
|
||||||
|
|
|
||||||
|
|
@ -53,7 +53,7 @@ async def cognify(
|
||||||
custom_prompt: Optional[str] = None,
|
custom_prompt: Optional[str] = None,
|
||||||
temporal_cognify: bool = False,
|
temporal_cognify: bool = False,
|
||||||
data_per_batch: int = 20,
|
data_per_batch: int = 20,
|
||||||
**kwargs
|
**kwargs,
|
||||||
):
|
):
|
||||||
"""
|
"""
|
||||||
Transform ingested data into a structured knowledge graph.
|
Transform ingested data into a structured knowledge graph.
|
||||||
|
|
|
||||||
|
|
@ -208,14 +208,14 @@ def get_datasets_router() -> APIRouter:
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
from cognee.modules.data.methods import get_dataset, delete_dataset
|
from cognee.modules.data.methods import delete_dataset
|
||||||
|
|
||||||
dataset = await get_dataset(user.id, dataset_id)
|
dataset = await get_authorized_existing_datasets([dataset_id], "delete", user)
|
||||||
|
|
||||||
if dataset is None:
|
if dataset is None:
|
||||||
raise DatasetNotFoundError(message=f"Dataset ({str(dataset_id)}) not found.")
|
raise DatasetNotFoundError(message=f"Dataset ({str(dataset_id)}) not found.")
|
||||||
|
|
||||||
await delete_dataset(dataset)
|
await delete_dataset(dataset[0])
|
||||||
|
|
||||||
@router.delete(
|
@router.delete(
|
||||||
"/{dataset_id}/data/{data_id}",
|
"/{dataset_id}/data/{data_id}",
|
||||||
|
|
|
||||||
|
|
@ -1,2 +1,4 @@
|
||||||
from .get_or_create_dataset_database import get_or_create_dataset_database
|
from .get_or_create_dataset_database import get_or_create_dataset_database
|
||||||
from .resolve_dataset_database_connection_info import resolve_dataset_database_connection_info
|
from .resolve_dataset_database_connection_info import resolve_dataset_database_connection_info
|
||||||
|
from .get_graph_dataset_database_handler import get_graph_dataset_database_handler
|
||||||
|
from .get_vector_dataset_database_handler import get_vector_dataset_database_handler
|
||||||
|
|
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
from cognee.modules.users.models.DatasetDatabase import DatasetDatabase
|
||||||
|
|
||||||
|
|
||||||
|
def get_graph_dataset_database_handler(dataset_database: DatasetDatabase) -> dict:
|
||||||
|
from cognee.infrastructure.databases.dataset_database_handler.supported_dataset_database_handlers import (
|
||||||
|
supported_dataset_database_handlers,
|
||||||
|
)
|
||||||
|
|
||||||
|
handler = supported_dataset_database_handlers[dataset_database.graph_dataset_database_handler]
|
||||||
|
return handler
|
||||||
|
|
@ -0,0 +1,10 @@
|
||||||
|
from cognee.modules.users.models.DatasetDatabase import DatasetDatabase
|
||||||
|
|
||||||
|
|
||||||
|
def get_vector_dataset_database_handler(dataset_database: DatasetDatabase) -> dict:
|
||||||
|
from cognee.infrastructure.databases.dataset_database_handler.supported_dataset_database_handlers import (
|
||||||
|
supported_dataset_database_handlers,
|
||||||
|
)
|
||||||
|
|
||||||
|
handler = supported_dataset_database_handlers[dataset_database.vector_dataset_database_handler]
|
||||||
|
return handler
|
||||||
|
|
@ -1,24 +1,12 @@
|
||||||
|
from cognee.infrastructure.databases.utils.get_graph_dataset_database_handler import (
|
||||||
|
get_graph_dataset_database_handler,
|
||||||
|
)
|
||||||
|
from cognee.infrastructure.databases.utils.get_vector_dataset_database_handler import (
|
||||||
|
get_vector_dataset_database_handler,
|
||||||
|
)
|
||||||
from cognee.modules.users.models.DatasetDatabase import DatasetDatabase
|
from cognee.modules.users.models.DatasetDatabase import DatasetDatabase
|
||||||
|
|
||||||
|
|
||||||
async def _get_vector_db_connection_info(dataset_database: DatasetDatabase) -> DatasetDatabase:
|
|
||||||
from cognee.infrastructure.databases.dataset_database_handler.supported_dataset_database_handlers import (
|
|
||||||
supported_dataset_database_handlers,
|
|
||||||
)
|
|
||||||
|
|
||||||
handler = supported_dataset_database_handlers[dataset_database.vector_dataset_database_handler]
|
|
||||||
return await handler["handler_instance"].resolve_dataset_connection_info(dataset_database)
|
|
||||||
|
|
||||||
|
|
||||||
async def _get_graph_db_connection_info(dataset_database: DatasetDatabase) -> DatasetDatabase:
|
|
||||||
from cognee.infrastructure.databases.dataset_database_handler.supported_dataset_database_handlers import (
|
|
||||||
supported_dataset_database_handlers,
|
|
||||||
)
|
|
||||||
|
|
||||||
handler = supported_dataset_database_handlers[dataset_database.graph_dataset_database_handler]
|
|
||||||
return await handler["handler_instance"].resolve_dataset_connection_info(dataset_database)
|
|
||||||
|
|
||||||
|
|
||||||
async def resolve_dataset_database_connection_info(
|
async def resolve_dataset_database_connection_info(
|
||||||
dataset_database: DatasetDatabase,
|
dataset_database: DatasetDatabase,
|
||||||
) -> DatasetDatabase:
|
) -> DatasetDatabase:
|
||||||
|
|
@ -31,6 +19,12 @@ async def resolve_dataset_database_connection_info(
|
||||||
Returns:
|
Returns:
|
||||||
DatasetDatabase instance with resolved connection info
|
DatasetDatabase instance with resolved connection info
|
||||||
"""
|
"""
|
||||||
dataset_database = await _get_vector_db_connection_info(dataset_database)
|
vector_dataset_database_handler = get_vector_dataset_database_handler(dataset_database)
|
||||||
dataset_database = await _get_graph_db_connection_info(dataset_database)
|
graph_dataset_database_handler = get_graph_dataset_database_handler(dataset_database)
|
||||||
|
dataset_database = await vector_dataset_database_handler[
|
||||||
|
"handler_instance"
|
||||||
|
].resolve_dataset_connection_info(dataset_database)
|
||||||
|
dataset_database = await graph_dataset_database_handler[
|
||||||
|
"handler_instance"
|
||||||
|
].resolve_dataset_connection_info(dataset_database)
|
||||||
return dataset_database
|
return dataset_database
|
||||||
|
|
|
||||||
|
|
@ -31,7 +31,10 @@ class LLMGateway:
|
||||||
|
|
||||||
llm_client = get_llm_client()
|
llm_client = get_llm_client()
|
||||||
return llm_client.acreate_structured_output(
|
return llm_client.acreate_structured_output(
|
||||||
text_input=text_input, system_prompt=system_prompt, response_model=response_model, **kwargs
|
text_input=text_input,
|
||||||
|
system_prompt=system_prompt,
|
||||||
|
response_model=response_model,
|
||||||
|
**kwargs,
|
||||||
)
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
|
|
|
||||||
|
|
@ -5,6 +5,10 @@ from cognee.context_global_variables import backend_access_control_enabled
|
||||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||||
from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine
|
from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
from cognee.infrastructure.databases.utils import (
|
||||||
|
get_graph_dataset_database_handler,
|
||||||
|
get_vector_dataset_database_handler,
|
||||||
|
)
|
||||||
from cognee.shared.cache import delete_cache
|
from cognee.shared.cache import delete_cache
|
||||||
from cognee.modules.users.models import DatasetDatabase
|
from cognee.modules.users.models import DatasetDatabase
|
||||||
from cognee.shared.logging_utils import get_logger
|
from cognee.shared.logging_utils import get_logger
|
||||||
|
|
@ -13,22 +17,13 @@ logger = get_logger()
|
||||||
|
|
||||||
|
|
||||||
async def prune_graph_databases():
|
async def prune_graph_databases():
|
||||||
async def _prune_graph_db(dataset_database: DatasetDatabase) -> dict:
|
|
||||||
from cognee.infrastructure.databases.dataset_database_handler.supported_dataset_database_handlers import (
|
|
||||||
supported_dataset_database_handlers,
|
|
||||||
)
|
|
||||||
|
|
||||||
handler = supported_dataset_database_handlers[
|
|
||||||
dataset_database.graph_dataset_database_handler
|
|
||||||
]
|
|
||||||
return await handler["handler_instance"].delete_dataset(dataset_database)
|
|
||||||
|
|
||||||
db_engine = get_relational_engine()
|
db_engine = get_relational_engine()
|
||||||
try:
|
try:
|
||||||
data = await db_engine.get_all_data_from_table("dataset_database")
|
dataset_databases = await db_engine.get_all_data_from_table("dataset_database")
|
||||||
# Go through each dataset database and delete the graph database
|
# Go through each dataset database and delete the graph database
|
||||||
for data_item in data:
|
for dataset_database in dataset_databases:
|
||||||
await _prune_graph_db(data_item)
|
handler = get_graph_dataset_database_handler(dataset_database)
|
||||||
|
await handler["handler_instance"].delete_dataset(dataset_database)
|
||||||
except (OperationalError, EntityNotFoundError) as e:
|
except (OperationalError, EntityNotFoundError) as e:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Skipping pruning of graph DB. Error when accessing dataset_database table: %s",
|
"Skipping pruning of graph DB. Error when accessing dataset_database table: %s",
|
||||||
|
|
@ -38,22 +33,13 @@ async def prune_graph_databases():
|
||||||
|
|
||||||
|
|
||||||
async def prune_vector_databases():
|
async def prune_vector_databases():
|
||||||
async def _prune_vector_db(dataset_database: DatasetDatabase) -> dict:
|
|
||||||
from cognee.infrastructure.databases.dataset_database_handler.supported_dataset_database_handlers import (
|
|
||||||
supported_dataset_database_handlers,
|
|
||||||
)
|
|
||||||
|
|
||||||
handler = supported_dataset_database_handlers[
|
|
||||||
dataset_database.vector_dataset_database_handler
|
|
||||||
]
|
|
||||||
return await handler["handler_instance"].delete_dataset(dataset_database)
|
|
||||||
|
|
||||||
db_engine = get_relational_engine()
|
db_engine = get_relational_engine()
|
||||||
try:
|
try:
|
||||||
data = await db_engine.get_all_data_from_table("dataset_database")
|
dataset_databases = await db_engine.get_all_data_from_table("dataset_database")
|
||||||
# Go through each dataset database and delete the vector database
|
# Go through each dataset database and delete the vector database
|
||||||
for data_item in data:
|
for dataset_database in dataset_databases:
|
||||||
await _prune_vector_db(data_item)
|
handler = get_vector_dataset_database_handler(dataset_database)
|
||||||
|
await handler["handler_instance"].delete_dataset(dataset_database)
|
||||||
except (OperationalError, EntityNotFoundError) as e:
|
except (OperationalError, EntityNotFoundError) as e:
|
||||||
logger.debug(
|
logger.debug(
|
||||||
"Skipping pruning of vector DB. Error when accessing dataset_database table: %s",
|
"Skipping pruning of vector DB. Error when accessing dataset_database table: %s",
|
||||||
|
|
|
||||||
|
|
@ -1,8 +1,34 @@
|
||||||
|
from cognee.modules.users.models import DatasetDatabase
|
||||||
|
from sqlalchemy import select
|
||||||
|
|
||||||
from cognee.modules.data.models import Dataset
|
from cognee.modules.data.models import Dataset
|
||||||
|
from cognee.infrastructure.databases.utils.get_vector_dataset_database_handler import (
|
||||||
|
get_vector_dataset_database_handler,
|
||||||
|
)
|
||||||
|
from cognee.infrastructure.databases.utils.get_graph_dataset_database_handler import (
|
||||||
|
get_graph_dataset_database_handler,
|
||||||
|
)
|
||||||
from cognee.infrastructure.databases.relational import get_relational_engine
|
from cognee.infrastructure.databases.relational import get_relational_engine
|
||||||
|
|
||||||
|
|
||||||
async def delete_dataset(dataset: Dataset):
|
async def delete_dataset(dataset: Dataset):
|
||||||
db_engine = get_relational_engine()
|
db_engine = get_relational_engine()
|
||||||
|
|
||||||
|
async with db_engine.get_async_session() as session:
|
||||||
|
stmt = select(DatasetDatabase).where(
|
||||||
|
DatasetDatabase.dataset_id == dataset.id,
|
||||||
|
)
|
||||||
|
dataset_database: DatasetDatabase = await session.scalar(stmt)
|
||||||
|
if dataset_database:
|
||||||
|
graph_dataset_database_handler = get_graph_dataset_database_handler(dataset_database)
|
||||||
|
vector_dataset_database_handler = get_vector_dataset_database_handler(dataset_database)
|
||||||
|
await graph_dataset_database_handler["handler_instance"].delete_dataset(
|
||||||
|
dataset_database
|
||||||
|
)
|
||||||
|
await vector_dataset_database_handler["handler_instance"].delete_dataset(
|
||||||
|
dataset_database
|
||||||
|
)
|
||||||
|
# TODO: Remove dataset from pipeline_run_status in Data objects related to dataset as well
|
||||||
|
# This blocks recreation of the dataset with the same name and data after deletion as
|
||||||
|
# it's marked as completed and will be just skipped even though it's empty.
|
||||||
return await db_engine.delete_entity_by_id(dataset.__tablename__, dataset.id)
|
return await db_engine.delete_entity_by_id(dataset.__tablename__, dataset.id)
|
||||||
|
|
|
||||||
76
cognee/tests/test_dataset_delete.py
Normal file
76
cognee/tests/test_dataset_delete.py
Normal file
|
|
@ -0,0 +1,76 @@
|
||||||
|
import os
|
||||||
|
import asyncio
|
||||||
|
import pathlib
|
||||||
|
from uuid import UUID
|
||||||
|
|
||||||
|
import cognee
|
||||||
|
from cognee.shared.logging_utils import setup_logging, ERROR
|
||||||
|
from cognee.modules.data.methods.delete_dataset import delete_dataset
|
||||||
|
from cognee.modules.data.methods.get_dataset import get_dataset
|
||||||
|
from cognee.modules.users.methods import get_default_user
|
||||||
|
|
||||||
|
|
||||||
|
async def main():
|
||||||
|
# Set data and system directory paths
|
||||||
|
data_directory_path = str(
|
||||||
|
pathlib.Path(
|
||||||
|
os.path.join(pathlib.Path(__file__).parent, ".data_storage/test_dataset_delete")
|
||||||
|
).resolve()
|
||||||
|
)
|
||||||
|
cognee.config.data_root_directory(data_directory_path)
|
||||||
|
cognee_directory_path = str(
|
||||||
|
pathlib.Path(
|
||||||
|
os.path.join(pathlib.Path(__file__).parent, ".cognee_system/test_dataset_delete")
|
||||||
|
).resolve()
|
||||||
|
)
|
||||||
|
cognee.config.system_root_directory(cognee_directory_path)
|
||||||
|
|
||||||
|
# Create a clean slate for cognee -- reset data and system state
|
||||||
|
print("Resetting cognee data...")
|
||||||
|
await cognee.prune.prune_data()
|
||||||
|
await cognee.prune.prune_system(metadata=True)
|
||||||
|
print("Data reset complete.\n")
|
||||||
|
|
||||||
|
# cognee knowledge graph will be created based on this text
|
||||||
|
text = """
|
||||||
|
Natural language processing (NLP) is an interdisciplinary
|
||||||
|
subfield of computer science and information retrieval.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Add the text, and make it available for cognify
|
||||||
|
await cognee.add(text, "nlp_dataset")
|
||||||
|
await cognee.add("Quantum computing is the study of quantum computers.", "quantum_dataset")
|
||||||
|
|
||||||
|
# Use LLMs and cognee to create knowledge graph
|
||||||
|
ret_val = await cognee.cognify()
|
||||||
|
user = await get_default_user()
|
||||||
|
|
||||||
|
for val in ret_val:
|
||||||
|
dataset_id = str(val)
|
||||||
|
vector_db_path = os.path.join(
|
||||||
|
cognee_directory_path, "databases", str(user.id), dataset_id + ".lance.db"
|
||||||
|
)
|
||||||
|
graph_db_path = os.path.join(
|
||||||
|
cognee_directory_path, "databases", str(user.id), dataset_id + ".pkl"
|
||||||
|
)
|
||||||
|
|
||||||
|
# Check if databases are properly created and exist before deletion
|
||||||
|
assert os.path.exists(graph_db_path), "Graph database file not found."
|
||||||
|
assert os.path.exists(vector_db_path), "Vector database file not found."
|
||||||
|
|
||||||
|
dataset = await get_dataset(user_id=user.id, dataset_id=UUID(dataset_id))
|
||||||
|
await delete_dataset(dataset)
|
||||||
|
|
||||||
|
# Confirm databases have been deleted
|
||||||
|
assert not os.path.exists(graph_db_path), "Graph database file found."
|
||||||
|
assert not os.path.exists(vector_db_path), "Vector database file found."
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
logger = setup_logging(log_level=ERROR)
|
||||||
|
loop = asyncio.new_event_loop()
|
||||||
|
asyncio.set_event_loop(loop)
|
||||||
|
try:
|
||||||
|
loop.run_until_complete(main())
|
||||||
|
finally:
|
||||||
|
loop.run_until_complete(loop.shutdown_asyncgens())
|
||||||
Loading…
Add table
Reference in a new issue