From 805d14726655035a16f55436248f599c8d7c84a9 Mon Sep 17 00:00:00 2001 From: Timmy Date: Sun, 10 Aug 2025 18:39:20 +0100 Subject: [PATCH] feat(kuzu): enable S3-aware Kuzu migration and auto-migrate in adapter\n\n- Add S3 staging, version read from S3 catalog.kz, and S3 rename (copy+rm)\n- Integrate auto-migration in KuzuAdapter for S3/local paths\n- Add unit tests for S3 migration helpers and adapter auto-migration\n\nI affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin Signed-off-by: Timmy --- .../databases/graph/kuzu/adapter.py | 67 +++-- .../databases/graph/kuzu/kuzu_migrate.py | 233 ++++++++++++++---- .../graph/kuzu/test_adapter_s3_migration.py | 191 ++++++++++++++ .../graph/kuzu/test_kuzu_migrate_s3.py | 195 +++++++++++++++ 4 files changed, 621 insertions(+), 65 deletions(-) create mode 100644 cognee/tests/unit/infrastructure/databases/graph/kuzu/test_adapter_s3_migration.py create mode 100644 cognee/tests/unit/infrastructure/databases/graph/kuzu/test_kuzu_migrate_s3.py diff --git a/cognee/infrastructure/databases/graph/kuzu/adapter.py b/cognee/infrastructure/databases/graph/kuzu/adapter.py index 4262178be..7d201115e 100644 --- a/cognee/infrastructure/databases/graph/kuzu/adapter.py +++ b/cognee/infrastructure/databases/graph/kuzu/adapter.py @@ -1,26 +1,27 @@ """Adapter for Kuzu graph database.""" -import os -import json import asyncio +import json +import os import tempfile +from concurrent.futures import ThreadPoolExecutor +from contextlib import asynccontextmanager +from datetime import datetime, timezone +from typing import Any, Dict, List, Optional, Tuple, Type, Union from uuid import UUID + from kuzu import Connection from kuzu.database import Database -from datetime import datetime, timezone -from contextlib import asynccontextmanager -from concurrent.futures import ThreadPoolExecutor -from typing import Dict, Any, List, Union, Optional, Tuple, Type -from cognee.shared.logging_utils import get_logger -from cognee.infrastructure.utils.run_sync import run_sync -from cognee.infrastructure.files.storage import get_file_storage from cognee.infrastructure.databases.graph.graph_db_interface import ( GraphDBInterface, record_graph_changes, ) from cognee.infrastructure.engine import DataPoint +from cognee.infrastructure.files.storage import get_file_storage +from cognee.infrastructure.utils.run_sync import run_sync from cognee.modules.storage.utils import JSONEncoder +from cognee.shared.logging_utils import get_logger logger = get_logger() @@ -47,16 +48,44 @@ class KuzuAdapter(GraphDBInterface): """Initialize the Kuzu database connection and schema.""" try: if "s3://" in self.db_path: + # Stage S3 DB to a local temp file with tempfile.NamedTemporaryFile(mode="w", delete=False) as temp_file: self.temp_graph_file = temp_file.name run_sync(self.pull_from_s3()) - self.db = Database( - self.temp_graph_file, - buffer_pool_size=2048 * 1024 * 1024, # 2048MB buffer pool - max_db_size=4096 * 1024 * 1024, - ) + # Try to open; if it fails due to version mismatch, migrate the temp copy and push back + try: + self.db = Database( + self.temp_graph_file, + buffer_pool_size=2048 * 1024 * 1024, # 2048MB buffer pool + max_db_size=4096 * 1024 * 1024, + ) + except RuntimeError: + import kuzu + + from .kuzu_migrate import kuzu_migration, read_kuzu_storage_version + + kuzu_db_version = read_kuzu_storage_version(self.temp_graph_file) + if ( + kuzu_db_version == "0.9.0" or kuzu_db_version == "0.8.2" + ) and kuzu_db_version != str(kuzu.__version__): # ensure string comparison + kuzu_migration( + new_db=self.temp_graph_file + "_new", + old_db=self.temp_graph_file, + new_version=str(kuzu.__version__), # pass str to satisfy types + old_version=kuzu_db_version, + overwrite=True, + ) + # Push migrated DB back to S3 + run_sync(self.push_to_s3()) + + # Retry opening after potential migration + self.db = Database( + self.temp_graph_file, + buffer_pool_size=2048 * 1024 * 1024, + max_db_size=4096 * 1024 * 1024, + ) else: # Ensure the parent directory exists before creating the database db_dir = os.path.dirname(self.db_path) @@ -79,20 +108,21 @@ class KuzuAdapter(GraphDBInterface): max_db_size=4096 * 1024 * 1024, ) except RuntimeError: - from .kuzu_migrate import read_kuzu_storage_version import kuzu + from .kuzu_migrate import read_kuzu_storage_version + kuzu_db_version = read_kuzu_storage_version(self.db_path) if ( kuzu_db_version == "0.9.0" or kuzu_db_version == "0.8.2" - ) and kuzu_db_version != kuzu.__version__: + ) and kuzu_db_version != str(kuzu.__version__): # Try to migrate kuzu database to latest version from .kuzu_migrate import kuzu_migration kuzu_migration( new_db=self.db_path + "_new", old_db=self.db_path, - new_version=kuzu.__version__, + new_version=str(kuzu.__version__), old_version=kuzu_db_version, overwrite=True, ) @@ -212,7 +242,8 @@ class KuzuAdapter(GraphDBInterface): """Convert a raw node result (with JSON properties) into a dictionary.""" if data.get("properties"): try: - props = json.loads(data["properties"]) + # Parse JSON properties into a dict + props: Dict[str, Any] = json.loads(data["properties"]) # Remove the JSON field and merge its contents data.pop("properties") data.update(props) diff --git a/cognee/infrastructure/databases/graph/kuzu/kuzu_migrate.py b/cognee/infrastructure/databases/graph/kuzu/kuzu_migrate.py index 3f930d91f..7cfddcf4f 100644 --- a/cognee/infrastructure/databases/graph/kuzu/kuzu_migrate.py +++ b/cognee/infrastructure/databases/graph/kuzu/kuzu_migrate.py @@ -27,13 +27,28 @@ Notes: - Can only be used to migrate to newer Kuzu versions, from 0.11.0 onwards """ -import tempfile -import sys -import struct -import shutil -import subprocess import argparse import os +import shutil +import struct +import subprocess +import sys +import tempfile +from typing import Any, Optional + + +# Lazy-import s3fs via our storage adapter only when needed, so local-only runs +# don't require S3 credentials or dependencies at import time. +def _is_s3_path(path: str) -> bool: + return path.startswith("s3://") + + +def _get_s3_client() -> Any: # Returns configured s3fs client via project storage adapter + from cognee.infrastructure.files.storage.S3FileStorage import S3FileStorage + + storage: Any = S3FileStorage("") + client: Any = storage.s3 # type: ignore[attr-defined] + return client kuzu_version_mapping = { @@ -46,38 +61,56 @@ kuzu_version_mapping = { } -def read_kuzu_storage_version(kuzu_db_path: str) -> int: +def read_kuzu_storage_version(kuzu_db_path: str) -> str: """ - Reads the Kùzu storage version code from the first catalog.bin file bytes. + Read the Kuzu storage version from the first bytes of catalog.kz and map it + to a human-readable Kuzu semantic version string (e.g. "0.9.0"). - :param kuzu_db_path: Path to the Kuzu database file/directory. - :return: Storage version code as an integer. + :param kuzu_db_path: Path/URI (local or s3://) to the Kuzu database file/directory. + :return: Semantic version string (e.g. "0.9.0"). """ - if os.path.isdir(kuzu_db_path): - kuzu_version_file_path = os.path.join(kuzu_db_path, "catalog.kz") - if not os.path.isfile(kuzu_version_file_path): - raise FileExistsError("Kuzu catalog.kz file does not exist") + if _is_s3_path(kuzu_db_path): + s3 = _get_s3_client() + # Determine whether the remote path is a directory or file + version_key = kuzu_db_path + try: + if s3.isdir(kuzu_db_path): + version_key = kuzu_db_path.rstrip("/") + "/catalog.kz" + # Open directly from S3 without downloading the entire DB + with s3.open(version_key, "rb") as f: + f.seek(4) + data = f.read(8) + except FileNotFoundError: + raise FileExistsError("Kuzu catalog.kz file does not exist on S3") else: - kuzu_version_file_path = kuzu_db_path + if os.path.isdir(kuzu_db_path): + kuzu_version_file_path = os.path.join(kuzu_db_path, "catalog.kz") + if not os.path.isfile(kuzu_version_file_path): + raise FileExistsError("Kuzu catalog.kz file does not exist") + else: + kuzu_version_file_path = kuzu_db_path - with open(kuzu_version_file_path, "rb") as f: - # Skip the 3-byte magic "KUZ" and one byte of padding - f.seek(4) - # Read the next 8 bytes as a little-endian unsigned 64-bit integer - data = f.read(8) - if len(data) < 8: - raise ValueError( - f"File '{kuzu_version_file_path}' does not contain a storage version code." - ) - version_code = struct.unpack(" str: +def ensure_env(version: str, export_dir: str) -> str: """ Create (if needed) a venv at .kuzu_envs/{version} and install kuzu=={version}. Returns the path to the venv's python executable. @@ -119,7 +152,14 @@ conn.execute(r\"\"\"{cypher}\"\"\") sys.exit(proc.returncode) -def kuzu_migration(new_db, old_db, new_version, old_version=None, overwrite=None, delete_old=None): +def kuzu_migration( + new_db: str, + old_db: str, + new_version: str, + old_version: Optional[str] = None, + overwrite: Optional[bool] = None, + delete_old: Optional[bool] = None, +) -> None: """ Main migration function that handles the complete migration process. """ @@ -131,23 +171,52 @@ def kuzu_migration(new_db, old_db, new_version, old_version=None, overwrite=None if not old_version: old_version = read_kuzu_storage_version(old_db) - # Check if old database exists - if not os.path.exists(old_db): - print(f"Source database '{old_db}' does not exist.", file=sys.stderr) - sys.exit(1) + # Check if old database exists (local or S3) + if _is_s3_path(old_db): + s3 = _get_s3_client() + if not (s3.exists(old_db) or s3.exists(old_db.rstrip("/") + "/")): + print(f"Source database '{old_db}' does not exist.", file=sys.stderr) + sys.exit(1) + else: + if not os.path.exists(old_db): + print(f"Source database '{old_db}' does not exist.", file=sys.stderr) + sys.exit(1) # Prepare target - ensure parent directory exists but remove target if it exists parent_dir = os.path.dirname(new_db) - if parent_dir: - os.makedirs(parent_dir, exist_ok=True) - - if os.path.exists(new_db): - raise FileExistsError( - "File already exists at new database location, remove file or change new database file path to continue" - ) + if _is_s3_path(new_db): + # For S3 we don't create directories locally; just ensure the key doesn't already exist + s3 = _get_s3_client() + if s3.exists(new_db) or s3.exists(new_db.rstrip("/") + "/"): + raise FileExistsError( + "File already exists at new database location on S3; remove it or change new database path to continue" + ) + else: + if parent_dir: + os.makedirs(parent_dir, exist_ok=True) + if os.path.exists(new_db): + raise FileExistsError( + "File already exists at new database location, remove file or change new database file path to continue" + ) # Use temp directory for all processing, it will be cleaned up after with statement with tempfile.TemporaryDirectory() as export_dir: + is_old_s3 = _is_s3_path(old_db) + is_new_s3 = _is_s3_path(new_db) + + # If old DB is on S3, download it locally first. + local_old_db = old_db + local_new_db = new_db + if is_old_s3: + s3 = _get_s3_client() + local_old_db = os.path.join(export_dir, "old_kuzu_db") + # Download either a file or a directory recursively + print(f"⬇️ Downloading old DB from S3 → {local_old_db}", file=sys.stderr) + s3.get(old_db, local_old_db, recursive=True) + + if is_new_s3: + # Always stage new DB locally, then upload after migration + local_new_db = os.path.join(export_dir, "new_kuzu_db") # Set up environments print(f"Setting up Kuzu {old_version} environment...", file=sys.stderr) old_py = ensure_env(old_version, export_dir) @@ -156,7 +225,7 @@ def kuzu_migration(new_db, old_db, new_version, old_version=None, overwrite=None export_file = os.path.join(export_dir, "kuzu_export") print(f"Exporting old DB → {export_dir}", file=sys.stderr) - run_migration_step(old_py, old_db, f"EXPORT DATABASE '{export_file}'") + run_migration_step(old_py, local_old_db, f"EXPORT DATABASE '{export_file}'") print("Export complete.", file=sys.stderr) # Check if export files were created and have content @@ -164,17 +233,36 @@ def kuzu_migration(new_db, old_db, new_version, old_version=None, overwrite=None if not os.path.exists(schema_file) or os.path.getsize(schema_file) == 0: raise ValueError(f"Schema file not found: {schema_file}") - print(f"Importing into new DB at {new_db}", file=sys.stderr) - run_migration_step(new_py, new_db, f"IMPORT DATABASE '{export_file}'") + print(f"Importing into new DB at {local_new_db}", file=sys.stderr) + run_migration_step(new_py, local_new_db, f"IMPORT DATABASE '{export_file}'") print("Import complete.", file=sys.stderr) - # Rename new kuzu database to old kuzu database name if enabled + # If the target is S3, upload the migrated DB now + if is_new_s3: + # Remove kuzu lock from migrated DB before upload if present + lock_file = local_new_db + ".lock" + if os.path.exists(lock_file): + os.remove(lock_file) + + print(f"⬆️ Uploading new DB to S3: {new_db}", file=sys.stderr) + s3 = _get_s3_client() + s3.put(local_new_db, new_db, recursive=True) + + # Normalize flags + overwrite = bool(overwrite) + delete_old = bool(delete_old) + + # Rename/move results into place if requested if overwrite or delete_old: - # Remove kuzu lock from migrated DB - lock_file = new_db + ".lock" - if os.path.exists(lock_file): - os.remove(lock_file) - rename_databases(old_db, old_version, new_db, delete_old) + if _is_s3_path(new_db) or _is_s3_path(old_db): + # S3-aware rename + _s3_rename_databases(old_db, old_version, new_db, delete_old) + else: + # Remove kuzu lock from migrated DB + lock_file = new_db + ".lock" + if os.path.exists(lock_file): + os.remove(lock_file) + rename_databases(old_db, old_version, new_db, delete_old) print("✅ Kuzu graph database migration finished successfully!") @@ -224,6 +312,57 @@ def rename_databases(old_db: str, old_version: str, new_db: str, delete_old: boo print(f"Renamed '{src_new}' to '{dst_new}'", file=sys.stderr) +def _s3_rename_databases(old_db: str, old_version: str, new_db: str, delete_old: bool): + """ + Perform S3-equivalent of rename_databases: optionally back up the original old_db + to *_old_, replace it with the new_db contents, and clean up. + + This function handles both file-based and directory-based Kuzu databases by using + recursive copy and remove operations provided by s3fs. + """ + s3 = _get_s3_client() + + # Normalize paths (keep s3:// URIs as they are; s3fs supports them) + def _isdir(p: str) -> bool: + try: + return s3.isdir(p) + except FileNotFoundError: + return False + + def _isfile(p: str) -> bool: + try: + return s3.isfile(p) + except FileNotFoundError: + return False + + base_dir = os.path.dirname(old_db.rstrip("/")) + name = os.path.basename(old_db.rstrip("/")) + backup_database_name = f"{name}_old_" + old_version.replace(".", "_") + backup_base = base_dir + "/" + backup_database_name + + # Back up or delete the original old_db + if _isfile(old_db): + if not delete_old: + s3.copy(old_db, backup_base, recursive=True) + print(f"Copied '{old_db}' to '{backup_base}' on S3", file=sys.stderr) + s3.rm(old_db, recursive=True) + elif _isdir(old_db): + if not delete_old: + s3.copy(old_db, backup_base, recursive=True) + print(f"Copied directory '{old_db}' to '{backup_base}' on S3", file=sys.stderr) + s3.rm(old_db, recursive=True) + else: + print(f"Original database path '{old_db}' not found on S3 for renaming.", file=sys.stderr) + sys.exit(1) + + # Move new into place under the old name + target_path = base_dir + "/" + name + s3.copy(new_db, target_path, recursive=True) + print(f"Copied '{new_db}' to '{target_path}' on S3", file=sys.stderr) + # Remove the staging 'new_db' key + s3.rm(new_db, recursive=True) + + def main(): p = argparse.ArgumentParser( description="Migrate Kùzu DB via PyPI versions", diff --git a/cognee/tests/unit/infrastructure/databases/graph/kuzu/test_adapter_s3_migration.py b/cognee/tests/unit/infrastructure/databases/graph/kuzu/test_adapter_s3_migration.py new file mode 100644 index 000000000..749f2df1e --- /dev/null +++ b/cognee/tests/unit/infrastructure/databases/graph/kuzu/test_adapter_s3_migration.py @@ -0,0 +1,191 @@ +import importlib.util +import os +import sys +import types +from types import ModuleType + + +class _DBOpenError(RuntimeError): + pass + + +class _FakeDatabase: + """Fake kuzu.Database that fails first, then succeeds.""" + + calls = 0 + + def __init__(self, path: str, **kwargs): + _FakeDatabase.calls += 1 + if _FakeDatabase.calls == 1: + raise _DBOpenError("version mismatch") + + def init_database(self): + pass + + +class _FakeConnection: + def __init__(self, db): + pass + + def execute(self, query: str, params=None): + class _Res: + def has_next(self): + return False + + def get_next(self): + return [] + + return _Res() + + +def _install_stub(name: str, module: ModuleType | None = None) -> ModuleType: + mod = module or ModuleType(name) + # Mark as package so submodule imports succeed when needed + if not hasattr(mod, "__path__"): + mod.__path__ = [] # type: ignore[attr-defined] + sys.modules[name] = mod + return mod + + +def _find_repo_root(start_path: str) -> str: + """Walk up directories until we find pyproject.toml (repo root).""" + cur = os.path.abspath(start_path) + while True: + if os.path.exists(os.path.join(cur, "pyproject.toml")): + return cur + parent = os.path.dirname(cur) + if parent == cur: + raise RuntimeError("Could not locate repository root from: " + start_path) + cur = parent + + +def _load_adapter_with_stubs(monkeypatch): + # Provide fake 'kuzu' and submodules used by adapter imports + kuzu_mod = _install_stub("kuzu") + kuzu_mod.__dict__["__version__"] = "0.11.0" + + # Placeholders to satisfy adapter's "from kuzu import Connection" and "from kuzu.database import Database" + class _PlaceholderConn: + pass + + kuzu_mod.Connection = _PlaceholderConn + kuzu_db_mod = _install_stub("kuzu.database") + + class _PlaceholderDB: + pass + + kuzu_db_mod.Database = _PlaceholderDB + + # Create minimal stub tree for required cognee imports to avoid executing package __init__ + root = _install_stub("cognee") + infra = _install_stub("cognee.infrastructure") + databases = _install_stub("cognee.infrastructure.databases") + graph = _install_stub("cognee.infrastructure.databases.graph") + kuzu_pkg = _install_stub("cognee.infrastructure.databases.graph.kuzu") + + # graph_db_interface stub + gdi_mod = _install_stub("cognee.infrastructure.databases.graph.graph_db_interface") + + class _GraphDBInterface: # bare minimum + pass + + def record_graph_changes(fn): + return fn + + gdi_mod.GraphDBInterface = _GraphDBInterface + gdi_mod.record_graph_changes = record_graph_changes + + # engine.DataPoint stub + engine_mod = _install_stub("cognee.infrastructure.engine") + + class _DataPoint: + def __init__(self, **kwargs): + self.__dict__.update(kwargs) + + engine_mod.DataPoint = _DataPoint + + # files.storage.get_file_storage stub + files_storage_pkg = _install_stub("cognee.infrastructure.files") + storage_pkg = _install_stub("cognee.infrastructure.files.storage") + storage_pkg.get_file_storage = lambda path: types.SimpleNamespace( + ensure_directory_exists=lambda: None + ) + + # utils.run_sync stub + utils_pkg = _install_stub("cognee.infrastructure.utils") + run_sync_mod = _install_stub("cognee.infrastructure.utils.run_sync") + run_sync_mod.run_sync = lambda coro: None + + # modules.storage.utils JSONEncoder stub + modules_pkg = _install_stub("cognee.modules") + storage_pkg2 = _install_stub("cognee.modules.storage") + utils_mod2 = _install_stub("cognee.modules.storage.utils") + utils_mod2.JSONEncoder = object + + # shared.logging_utils.get_logger stub + shared_pkg = _install_stub("cognee.shared") + logging_utils_mod = _install_stub("cognee.shared.logging_utils") + + class _Logger: + def debug(self, *a, **k): + pass + + def error(self, *a, **k): + pass + + logging_utils_mod.get_logger = lambda: _Logger() + + # Now load adapter.py by path + repo_root = _find_repo_root(os.path.dirname(__file__)) + adapter_path = os.path.join( + repo_root, "cognee", "infrastructure", "databases", "graph", "kuzu", "adapter.py" + ) + spec = importlib.util.spec_from_file_location( + "cognee.infrastructure.databases.graph.kuzu.adapter", adapter_path + ) + assert spec and spec.loader + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) # type: ignore[attr-defined] + + # Replace Database/Connection in the loaded module + monkeypatch.setattr(mod, "Database", _FakeDatabase, raising=True) + monkeypatch.setattr(mod, "Connection", _FakeConnection, raising=True) + + # Patch migration helpers inside the kuzu_migrate module used by adapter + # Load kuzu_migrate similarly + km_path = os.path.join( + repo_root, "cognee", "infrastructure", "databases", "graph", "kuzu", "kuzu_migrate.py" + ) + km_spec = importlib.util.spec_from_file_location("kuzu_migrate_under_test", km_path) + km_mod = importlib.util.module_from_spec(km_spec) + assert km_spec and km_spec.loader + km_spec.loader.exec_module(km_mod) # type: ignore[attr-defined] + + calls = {"migrated": False} + + def fake_read_version(_): + return "0.9.0" + + def fake_migration(**kwargs): + calls["migrated"] = True + + monkeypatch.setattr(km_mod, "read_kuzu_storage_version", fake_read_version) + monkeypatch.setattr(km_mod, "kuzu_migration", fake_migration) + + # Ensure adapter refers to our loaded km_mod + monkeypatch.setitem( + sys.modules, "cognee.infrastructure.databases.graph.kuzu.kuzu_migrate", km_mod + ) + + return mod, calls + + +def test_adapter_s3_auto_migration(monkeypatch): + mod, calls = _load_adapter_with_stubs(monkeypatch) + + # ensure pull/push do not touch real S3 + monkeypatch.setattr(mod.KuzuAdapter, "pull_from_s3", lambda self: None) + monkeypatch.setattr(mod.KuzuAdapter, "push_to_s3", lambda self: None) + + adapter = mod.KuzuAdapter("s3://bucket/db") + assert calls["migrated"] is True diff --git a/cognee/tests/unit/infrastructure/databases/graph/kuzu/test_kuzu_migrate_s3.py b/cognee/tests/unit/infrastructure/databases/graph/kuzu/test_kuzu_migrate_s3.py new file mode 100644 index 000000000..4a116b94b --- /dev/null +++ b/cognee/tests/unit/infrastructure/databases/graph/kuzu/test_kuzu_migrate_s3.py @@ -0,0 +1,195 @@ +import io +import os +import struct +import importlib.util +from types import ModuleType +from typing import Dict + +import pytest + + +class _FakeS3: + """ + Minimal fake S3 client implementing the subset used by kuzu_migrate helpers. + + Store layout is a dict mapping string keys to bytes (files). Directories are + implicit via key prefixes. Methods operate on s3:// style keys. + """ + + def __init__(self, initial: Dict[str, bytes] | None = None): + self.store: Dict[str, bytes] = dict(initial or {}) + + # Helpers + def _norm(self, path: str) -> str: + return path.rstrip("/") + + def _is_prefix(self, prefix: str, key: str) -> bool: + p = self._norm(prefix) + return key == p or key.startswith(p + "/") + + # API used by kuzu_migrate + def exists(self, path: str) -> bool: + p = self._norm(path) + if p in self.store: + return True + # any key under this prefix implies existence as a directory + return any(self._is_prefix(p, k) for k in self.store) + + def isdir(self, path: str) -> bool: + p = self._norm(path) + # A directory is assumed if there is any key with this prefix and that key isn't exactly the same + return any(self._is_prefix(p, k) and k != p for k in self.store) + + def isfile(self, path: str) -> bool: + p = self._norm(path) + return p in self.store + + def open(self, path: str, mode: str = "rb"): + p = self._norm(path) + if "r" in mode: + if p not in self.store: + raise FileNotFoundError(p) + return io.BytesIO(self.store[p]) + elif "w" in mode: + buf = io.BytesIO() + + def _close(): + self.store[p] = buf.getvalue() + + # monkeypatch close so that written data is persisted on close + orig_close = buf.close + + def close_wrapper(): + _close() + orig_close() + + buf.close = close_wrapper # type: ignore[assignment] + return buf + else: + raise ValueError(f"Unsupported mode: {mode}") + + def copy(self, src: str, dst: str, recursive: bool = True): + s = self._norm(src) + d = self._norm(dst) + if recursive: + # copy all keys under src prefix to dst prefix + to_copy = [k for k in self.store if self._is_prefix(s, k)] + for key in to_copy: + new_key = key.replace(s, d, 1) + self.store[new_key] = self.store[key] + else: + if s not in self.store: + raise FileNotFoundError(s) + self.store[d] = self.store[s] + + def rm(self, path: str, recursive: bool = False): + p = self._norm(path) + if recursive: + for key in list(self.store.keys()): + if self._is_prefix(p, key): + del self.store[key] + else: + if p in self.store: + del self.store[p] + else: + raise FileNotFoundError(p) + + +def _load_module_by_path(path: str) -> ModuleType: + spec = importlib.util.spec_from_file_location("kuzu_migrate_under_test", path) + assert spec and spec.loader + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) # type: ignore[attr-defined] + return mod + + +def _find_repo_root(start_path: str) -> str: + cur = os.path.abspath(start_path) + while True: + if os.path.exists(os.path.join(cur, "pyproject.toml")): + return cur + parent = os.path.dirname(cur) + if parent == cur: + raise RuntimeError("Could not locate repository root from: " + start_path) + cur = parent + + +@pytest.fixture +def km_module(monkeypatch): + # Load the kuzu_migrate module directly from file to avoid importing package __init__ + repo_root = _find_repo_root(os.path.dirname(__file__)) + target = os.path.join( + repo_root, "cognee", "infrastructure", "databases", "graph", "kuzu", "kuzu_migrate.py" + ) + mod = _load_module_by_path(target) + return mod + + +@pytest.fixture +def patch_get_s3_client(monkeypatch, km_module): + # Provide each test with its own fake client instance + client = _FakeS3() + monkeypatch.setattr(km_module, "_get_s3_client", lambda: client) + return client + + +def _make_catalog_bytes(version_code: int) -> bytes: + # 4 bytes header skipped + 8 bytes little-endian version code + return b"KUZ\x00" + struct.pack("