cognee/alembic/versions/46a6ce2bd2b2_expand_dataset_database_with_json_.py
Igor Ilic 127d9860df
feat: Add dataset database handler info (#1887)
<!-- .github/pull_request_template.md -->

## Description
Add info on dataset database handler used for dataset database

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [ ] **I have tested my changes thoroughly before submitting this PR**
- [ ] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit

* **New Features**
* Datasets now record their assigned vector and graph database handlers,
allowing per-dataset backend selection.

* **Chores**
  * Database schema expanded to store handler identifiers per dataset.
* Deletion/cleanup processes now use dataset-level handler info for
accurate removal across backends.

* **Tests**
* Tests updated to include and validate the new handler fields in
dataset creation outputs.

<sub>✏️ Tip: You can customize this high-level summary in your review
settings.</sub>
<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-12-12 13:22:03 +01:00

333 lines
12 KiB
Python

"""Expand dataset database with json connection field
Revision ID: 46a6ce2bd2b2
Revises: 76625596c5c3
Create Date: 2025-11-25 17:56:28.938931
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers, used by Alembic.
revision: str = "46a6ce2bd2b2"
down_revision: Union[str, None] = "76625596c5c3"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
graph_constraint_name = "dataset_database_graph_database_name_key"
vector_constraint_name = "dataset_database_vector_database_name_key"
TABLE_NAME = "dataset_database"
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def _recreate_table_without_unique_constraint_sqlite(op, insp):
"""
SQLite cannot drop unique constraints on individual columns. We must:
1. Create a new table without the unique constraints.
2. Copy data from the old table.
3. Drop the old table.
4. Rename the new table.
"""
conn = op.get_bind()
# Create new table definition (without unique constraints)
op.create_table(
f"{TABLE_NAME}_new",
sa.Column("owner_id", sa.UUID()),
sa.Column("dataset_id", sa.UUID(), primary_key=True, nullable=False),
sa.Column("vector_database_name", sa.String(), nullable=False),
sa.Column("graph_database_name", sa.String(), nullable=False),
sa.Column("vector_database_provider", sa.String(), nullable=False),
sa.Column("graph_database_provider", sa.String(), nullable=False),
sa.Column(
"vector_dataset_database_handler",
sa.String(),
unique=False,
nullable=False,
server_default="lancedb",
),
sa.Column(
"graph_dataset_database_handler",
sa.String(),
unique=False,
nullable=False,
server_default="kuzu",
),
sa.Column("vector_database_url", sa.String()),
sa.Column("graph_database_url", sa.String()),
sa.Column("vector_database_key", sa.String()),
sa.Column("graph_database_key", sa.String()),
sa.Column(
"graph_database_connection_info",
sa.JSON(),
nullable=False,
server_default=sa.text("'{}'"),
),
sa.Column(
"vector_database_connection_info",
sa.JSON(),
nullable=False,
server_default=sa.text("'{}'"),
),
sa.Column("created_at", sa.DateTime()),
sa.Column("updated_at", sa.DateTime()),
sa.ForeignKeyConstraint(["dataset_id"], ["datasets.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["owner_id"], ["principals.id"], ondelete="CASCADE"),
)
# Copy data into new table
conn.execute(
sa.text(f"""
INSERT INTO {TABLE_NAME}_new
SELECT
owner_id,
dataset_id,
vector_database_name,
graph_database_name,
vector_database_provider,
graph_database_provider,
vector_dataset_database_handler,
graph_dataset_database_handler,
vector_database_url,
graph_database_url,
vector_database_key,
graph_database_key,
COALESCE(graph_database_connection_info, '{{}}'),
COALESCE(vector_database_connection_info, '{{}}'),
created_at,
updated_at
FROM {TABLE_NAME}
""")
)
# Drop old table
op.drop_table(TABLE_NAME)
# Rename new table
op.rename_table(f"{TABLE_NAME}_new", TABLE_NAME)
def _recreate_table_with_unique_constraint_sqlite(op, insp):
"""
SQLite cannot drop unique constraints on individual columns. We must:
1. Create a new table without the unique constraints.
2. Copy data from the old table.
3. Drop the old table.
4. Rename the new table.
"""
conn = op.get_bind()
# Create new table definition (without unique constraints)
op.create_table(
f"{TABLE_NAME}_new",
sa.Column("owner_id", sa.UUID()),
sa.Column("dataset_id", sa.UUID(), primary_key=True, nullable=False),
sa.Column("vector_database_name", sa.String(), nullable=False, unique=True),
sa.Column("graph_database_name", sa.String(), nullable=False, unique=True),
sa.Column("vector_database_provider", sa.String(), nullable=False),
sa.Column("graph_database_provider", sa.String(), nullable=False),
sa.Column(
"vector_dataset_database_handler",
sa.String(),
unique=False,
nullable=False,
server_default="lancedb",
),
sa.Column(
"graph_dataset_database_handler",
sa.String(),
unique=False,
nullable=False,
server_default="kuzu",
),
sa.Column("vector_database_url", sa.String()),
sa.Column("graph_database_url", sa.String()),
sa.Column("vector_database_key", sa.String()),
sa.Column("graph_database_key", sa.String()),
sa.Column(
"graph_database_connection_info",
sa.JSON(),
nullable=False,
server_default=sa.text("'{}'"),
),
sa.Column(
"vector_database_connection_info",
sa.JSON(),
nullable=False,
server_default=sa.text("'{}'"),
),
sa.Column("created_at", sa.DateTime()),
sa.Column("updated_at", sa.DateTime()),
sa.ForeignKeyConstraint(["dataset_id"], ["datasets.id"], ondelete="CASCADE"),
sa.ForeignKeyConstraint(["owner_id"], ["principals.id"], ondelete="CASCADE"),
)
# Copy data into new table
conn.execute(
sa.text(f"""
INSERT INTO {TABLE_NAME}_new
SELECT
owner_id,
dataset_id,
vector_database_name,
graph_database_name,
vector_database_provider,
graph_database_provider,
vector_dataset_database_handler,
graph_dataset_database_handler,
vector_database_url,
graph_database_url,
vector_database_key,
graph_database_key,
COALESCE(graph_database_connection_info, '{{}}'),
COALESCE(vector_database_connection_info, '{{}}'),
created_at,
updated_at
FROM {TABLE_NAME}
""")
)
# Drop old table
op.drop_table(TABLE_NAME)
# Rename new table
op.rename_table(f"{TABLE_NAME}_new", TABLE_NAME)
def upgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
unique_constraints = insp.get_unique_constraints(TABLE_NAME)
vector_database_connection_info_column = _get_column(
insp, "dataset_database", "vector_database_connection_info"
)
if not vector_database_connection_info_column:
op.add_column(
"dataset_database",
sa.Column(
"vector_database_connection_info",
sa.JSON(),
unique=False,
nullable=False,
server_default=sa.text("'{}'"),
),
)
vector_dataset_database_handler = _get_column(
insp, "dataset_database", "vector_dataset_database_handler"
)
if not vector_dataset_database_handler:
# Add LanceDB as the default graph dataset database handler
op.add_column(
"dataset_database",
sa.Column(
"vector_dataset_database_handler",
sa.String(),
unique=False,
nullable=False,
server_default="lancedb",
),
)
graph_database_connection_info_column = _get_column(
insp, "dataset_database", "graph_database_connection_info"
)
if not graph_database_connection_info_column:
op.add_column(
"dataset_database",
sa.Column(
"graph_database_connection_info",
sa.JSON(),
unique=False,
nullable=False,
server_default=sa.text("'{}'"),
),
)
graph_dataset_database_handler = _get_column(
insp, "dataset_database", "graph_dataset_database_handler"
)
if not graph_dataset_database_handler:
# Add Kuzu as the default graph dataset database handler
op.add_column(
"dataset_database",
sa.Column(
"graph_dataset_database_handler",
sa.String(),
unique=False,
nullable=False,
server_default="kuzu",
),
)
with op.batch_alter_table("dataset_database", schema=None) as batch_op:
# Drop the unique constraint to make unique=False
graph_constraint_to_drop = None
for uc in unique_constraints:
# Check if the constraint covers ONLY the target column
if uc["name"] == graph_constraint_name:
graph_constraint_to_drop = uc["name"]
break
vector_constraint_to_drop = None
for uc in unique_constraints:
# Check if the constraint covers ONLY the target column
if uc["name"] == vector_constraint_name:
vector_constraint_to_drop = uc["name"]
break
if (
vector_constraint_to_drop
and graph_constraint_to_drop
and op.get_context().dialect.name == "postgresql"
):
# PostgreSQL
batch_op.drop_constraint(graph_constraint_name, type_="unique")
batch_op.drop_constraint(vector_constraint_name, type_="unique")
if op.get_context().dialect.name == "sqlite":
conn = op.get_bind()
# Fun fact: SQLite has hidden auto indexes for unique constraints that can't be dropped or accessed directly
# So we need to check for them and drop them by recreating the table (altering column also won't work)
result = conn.execute(sa.text("PRAGMA index_list('dataset_database')"))
rows = result.fetchall()
unique_auto_indexes = [row for row in rows if row[3] == "u"]
for row in unique_auto_indexes:
result = conn.execute(sa.text(f"PRAGMA index_info('{row[1]}')"))
index_info = result.fetchall()
if index_info[0][2] == "vector_database_name":
# In case a unique index exists on vector_database_name, drop it and the graph_database_name one
_recreate_table_without_unique_constraint_sqlite(op, insp)
def downgrade() -> None:
conn = op.get_bind()
insp = sa.inspect(conn)
if op.get_context().dialect.name == "sqlite":
_recreate_table_with_unique_constraint_sqlite(op, insp)
elif op.get_context().dialect.name == "postgresql":
with op.batch_alter_table("dataset_database", schema=None) as batch_op:
# Re-add the unique constraint to return to unique=True
batch_op.create_unique_constraint(graph_constraint_name, ["graph_database_name"])
with op.batch_alter_table("dataset_database", schema=None) as batch_op:
# Re-add the unique constraint to return to unique=True
batch_op.create_unique_constraint(vector_constraint_name, ["vector_database_name"])
op.drop_column("dataset_database", "vector_database_connection_info")
op.drop_column("dataset_database", "graph_database_connection_info")
op.drop_column("dataset_database", "vector_dataset_database_handler")
op.drop_column("dataset_database", "graph_dataset_database_handler")