feat: Add automatic Kuzu migration as exception handling attempt

This commit is contained in:
Igor Ilic 2025-07-23 19:55:55 +02:00
parent ce312fb397
commit 8a7c6bfef0
2 changed files with 314 additions and 5 deletions

View file

@ -72,11 +72,36 @@ class KuzuAdapter(GraphDBInterface):
run_sync(file_storage.ensure_directory_exists())
self.db = Database(
self.db_path,
buffer_pool_size=256 * 1024 * 1024, # 256MB buffer pool
max_db_size=1024 * 1024 * 1024,
)
try:
self.db = Database(
self.db_path,
buffer_pool_size=256 * 1024 * 1024, # 256MB buffer pool
max_db_size=1024 * 1024 * 1024,
)
except RuntimeError:
from .kuzu_migrate import read_kuzu_storage_version
import kuzu
kuzu_db_version = read_kuzu_storage_version(self.db_path)
if kuzu_db_version == "0.9.0" and kuzu_db_version != kuzu.__version__:
# TODO: Add user prompt to ask for permission to attempt migration.
# TODO: Write migration script that will handle all user graph databases in multi-user mode
# Try to migrate kuzu database to latest version
from .kuzu_migrate import kuzu_migration
kuzu_migration(
new_db=self.db_path + "new",
old_db=self.db_path,
overwrite=True,
delete_old=True,
new_version=kuzu.__version__,
old_version=kuzu_db_version,
)
self.db = Database(
self.db_path,
buffer_pool_size=256 * 1024 * 1024, # 256MB buffer pool
max_db_size=1024 * 1024 * 1024,
)
self.db.init_database()
self.connection = Connection(self.db)

View file

@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""
Kuzu Database Migration Script
This script migrates Kuzu databases between different versions by:
1. Setting up isolated Python environments for each Kuzu version
2. Exporting data from the source database using the old version
3. Importing data into the target database using the new version
4. If overwrite is enabled target database will replace source database and source database will have the prefix _old
5. If delete-old is enabled target database will be renamed to source database and source database will be deleted
The script automatically handles:
- Environment setup (creates virtual environments as needed)
- Export/import validation
- Error handling and reporting
Usage Examples:
# Basic migration from 0.9.0 to 0.11.0
python kuzu_migrate.py --old-version 0.9.0 --new-version 0.11.0 --old-db /path/to/old/database --new-db /path/to/new/database
Requirements:
- Python 3.7+
- Internet connection (to download Kuzu packages)
- Sufficient disk space for virtual environments and temporary exports
Notes:
- Can only be used to migrate to newer Kuzu versions, from 0.11.0 onwards
"""
import tempfile
import sys
import struct
import shutil
import subprocess
import argparse
import os
script_dir = os.path.dirname(os.path.abspath(__file__))
kuzu_envs_dir = os.path.join(script_dir, ".kuzu_envs")
kuzu_version_mapping = {37: "0.9.0", 38: "0.10.0", 39: "0.11.0"}
def read_kuzu_storage_version(kuzu_db_path: str) -> int:
"""
Reads the Kùzu storage version code from the first catalog.bin file bytes.
:param kuzu_db_path: Path to the Kuzu database file/directory.
:return: Storage version code as an integer.
"""
if os.path.isdir(kuzu_db_path):
kuzu_version_file_path = os.path.join(kuzu_db_path, "catalog.kz")
if not os.path.isfile(kuzu_version_file_path):
raise FileExistsError("Kuzu catalog.kz file does not exist")
else:
kuzu_version_file_path = kuzu_db_path
with open(kuzu_version_file_path, "rb") as f:
# Skip the 3-byte magic "KUZ" and one byte of padding
f.seek(4)
# Read the next 8 bytes as a little-endian unsigned 64-bit integer
data = f.read(8)
if len(data) < 8:
raise ValueError(
f"File '{kuzu_version_file_path}' does not contain a storage version code."
)
version_code = struct.unpack("<Q", data)[0]
if kuzu_version_mapping.get(version_code):
return kuzu_version_mapping[version_code]
else:
ValueError("Could not map version_code to proper Kuzu version.")
def ensure_env(version: str) -> str:
"""
Create (if needed) a venv at .kuzu_envs/{version} and install kuzu=={version}.
Returns the path to the venv's python executable.
"""
# venv base under the script directory
base = os.path.join(kuzu_envs_dir, version)
py_bin = os.path.join(base, "bin", "python")
# If environment already exists clean it
if os.path.isfile(py_bin):
shutil.rmtree(base)
print(f"→ Setting up venv for Kùzu {version}...", file=sys.stderr)
# Create venv
subprocess.run([sys.executable, "-m", "venv", base], check=True)
# Install the specific Kùzu version
subprocess.run([py_bin, "-m", "pip", "install", "--upgrade", "pip"], check=True)
subprocess.run([py_bin, "-m", "pip", "install", f"kuzu=={version}"], check=True)
return py_bin
def run_migration_step(python_exe: str, db_path: str, cypher: str):
"""
Uses the given python_exe to execute a short snippet that
connects to the Kùzu database and runs a Cypher command.
"""
snippet = f"""
import kuzu
db = kuzu.Database(r"{db_path}")
conn = kuzu.Connection(db)
conn.execute(r\"\"\"{cypher}\"\"\")
"""
proc = subprocess.run([python_exe, "-c", snippet], capture_output=True, text=True)
if proc.returncode != 0:
print(f"[ERROR] {cypher} failed:\n{proc.stderr}", file=sys.stderr)
sys.exit(proc.returncode)
def kuzu_migration(new_db, old_db, overwrite, delete_old, new_version, old_version=None):
"""
Main migration function that handles the complete migration process.
"""
# If version of old kuzu db is not provided try to determine it based on file info
if not old_version:
old_version = read_kuzu_storage_version(old_db)
# Check if old database exists
if not os.path.exists(old_db):
print(f"Source database '{old_db}' does not exist.", file=sys.stderr)
sys.exit(1)
# Prepare target - ensure parent directory exists but remove target if it exists
parent_dir = os.path.dirname(new_db)
if parent_dir:
os.makedirs(parent_dir, exist_ok=True)
if os.path.exists(new_db):
raise FileExistsError(
"File already exists at new database location, remove file or change new database file path to continue"
)
# Set up environments
print(f"Setting up Kuzu {old_version} environment...", file=sys.stderr)
old_py = ensure_env(old_version)
print(f"Setting up Kuzu {new_version} environment...", file=sys.stderr)
new_py = ensure_env(new_version)
with tempfile.TemporaryDirectory() as export_dir:
export_file = os.path.join(export_dir, "kuzu_export")
print(f"Exporting old DB → {export_dir}", file=sys.stderr)
run_migration_step(old_py, old_db, f"EXPORT DATABASE '{export_file}'")
print("Export complete.", file=sys.stderr)
# Check if export files were created and have content
schema_file = os.path.join(export_file, "schema.cypher")
if not os.path.exists(schema_file) or os.path.getsize(schema_file) == 0:
raise ValueError(f"Schema file not found: {schema_file}")
print(f"Importing into new DB at {new_db}", file=sys.stderr)
run_migration_step(new_py, new_db, f"IMPORT DATABASE '{export_file}'")
print("Import complete.", file=sys.stderr)
# Rename new kuzu database to old kuzu database name if enabled
if overwrite or delete_old:
rename_databases(old_db, new_db, delete_old)
# Clean up virtual environments after successful migration
cleanup_environments()
print("✅ Migration finished successfully!")
def rename_databases(old_db: str, new_db: str, delete_old: bool):
"""
When overwrite is enabled, back up the original old_db (file with .lock and .wal or directory)
by renaming it to *_old, and replace it with the newly imported new_db files.
When delete_old is enabled replace the old database with the new one and delete old database
"""
base_dir = os.path.dirname(old_db)
name = os.path.basename(old_db.rstrip(os.sep))
backup_base = os.path.join(base_dir, f"{name}_old")
if os.path.isfile(old_db):
# File-based database: handle main file and accompanying lock/WAL
for ext in ["", ".lock", ".wal"]:
src = old_db + ext
dst = backup_base + ext
if os.path.exists(src):
if delete_old:
os.remove(src)
else:
os.rename(src, dst)
print(f"Renamed '{src}' to '{dst}'", file=sys.stderr)
elif os.path.isdir(old_db):
# Directory-based Kuzu database
backup_dir = backup_base
if delete_old:
shutil.rmtree(old_db)
else:
os.rename(old_db, backup_dir)
print(f"Renamed directory '{old_db}' to '{backup_dir}'", file=sys.stderr)
else:
print(f"Original database path '{old_db}' not found for renaming.", file=sys.stderr)
sys.exit(1)
# Now move new files into place
for ext in ["", ".lock", ".wal"]:
src_new = new_db + ext
dst_new = os.path.join(base_dir, name + ext)
if os.path.exists(src_new):
os.rename(src_new, dst_new)
print(f"Renamed '{src_new}' to '{dst_new}'", file=sys.stderr)
def cleanup_environments():
"""
Clean up the .kuzu_envs directory after migration is complete.
This removes the temporary virtual environments to save disk space.
"""
if os.path.exists(kuzu_envs_dir):
try:
shutil.rmtree(kuzu_envs_dir)
print(f"🧹 Cleaned up virtual environments directory: {kuzu_envs_dir}", file=sys.stderr)
except Exception as e:
print(f"⚠️ Warning: Could not clean up {kuzu_envs_dir}: {e}", file=sys.stderr)
else:
print(f"📁 Virtual environments directory not found: {kuzu_envs_dir}", file=sys.stderr)
def main():
p = argparse.ArgumentParser(
description="Migrate Kùzu DB via PyPI versions",
epilog="""
Examples:
%(prog)s --old-version 0.9.0 --new-version 0.11.0 \\
--old-db /path/to/old/db --new-db /path/to/new/db --overwrite
Note: This script will create virtual environments in .kuzu_envs/ directory
to isolate different Kuzu versions.
""",
formatter_class=argparse.RawDescriptionHelpFormatter,
)
p.add_argument(
"--old-version",
required=False,
default=None,
help="Source Kuzu version (e.g., 0.9.0). If not provided automatic kuzu version detection will be attempted.",
)
p.add_argument("--new-version", required=True, help="Target Kuzu version (e.g., 0.11.0)")
p.add_argument("--old-db", required=True, help="Path to source database directory")
p.add_argument(
"--new-db",
required=True,
help="Path to target database directory, it can't be the same path as the old database. Use the overwrite flag if you want to replace the old database with the new one.",
)
p.add_argument(
"--overwrite",
action="store_true",
default=False,
help="Rename new-db to the old-db name and location, keeps old-db as backup if delete-old is not True",
)
p.add_argument(
"--delete-old",
action="store_true",
default=False,
help="When overwrite and delete-old is True old-db will not be stored as backup",
)
args = p.parse_args()
print(
f"🔄 Migrating Kuzu database from {args.old_version} to {args.new_version}", file=sys.stderr
)
print(f"📂 Source: {args.old_db}", file=sys.stderr)
print(f"📂 Target: {args.new_db}", file=sys.stderr)
print("", file=sys.stderr)
kuzu_migration(
new_db=args.new_db,
old_db=args.old_db,
overwrite=args.overwrite,
delete_old=args.delete_old,
new_version=args.new_version,
old_version=args.old_version,
)
if __name__ == "__main__":
main()