From 268db003e3732cd8e6dd6e9826ba22e814871991 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 9 Jan 2026 18:10:54 +0100 Subject: [PATCH 1/2] feat: Add delete to neo4j aura --- .../Neo4jAuraDevDatasetDatabaseHandler.py | 63 ++++++++++++------- 1 file changed, 41 insertions(+), 22 deletions(-) diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/Neo4jAuraDevDatasetDatabaseHandler.py b/cognee/infrastructure/databases/graph/neo4j_driver/Neo4jAuraDevDatasetDatabaseHandler.py index eb6cbc55a..568bea528 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/Neo4jAuraDevDatasetDatabaseHandler.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/Neo4jAuraDevDatasetDatabaseHandler.py @@ -1,10 +1,12 @@ import os +import aiohttp import asyncio import requests import base64 import hashlib from uuid import UUID from typing import Optional +from urllib.parse import urlparse from cryptography.fernet import Fernet from cognee.infrastructure.databases.graph import get_graph_config @@ -26,6 +28,13 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): - Requests should be made async, currently a blocking requests library is used. """ + # Client credentials and encryption + client_id = os.environ.get("NEO4J_CLIENT_ID", None) + client_secret = os.environ.get("NEO4J_CLIENT_SECRET", None) + tenant_id = os.environ.get("NEO4J_TENANT_ID", None) + encryption_env_key = os.environ.get("NEO4J_ENCRYPTION_KEY", "test_key") + encryption_key = base64.urlsafe_b64encode(hashlib.sha256(encryption_env_key.encode()).digest()) + @classmethod async def create_dataset(cls, dataset_id: Optional[UUID], user: Optional[User]) -> dict: """ @@ -48,31 +57,14 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): graph_db_name = f"{dataset_id}" - # Client credentials and encryption - client_id = os.environ.get("NEO4J_CLIENT_ID", None) - client_secret = os.environ.get("NEO4J_CLIENT_SECRET", None) - tenant_id = os.environ.get("NEO4J_TENANT_ID", None) - encryption_env_key = os.environ.get("NEO4J_ENCRYPTION_KEY", "test_key") - encryption_key = base64.urlsafe_b64encode( - hashlib.sha256(encryption_env_key.encode()).digest() - ) - cipher = Fernet(encryption_key) + cipher = Fernet(cls.encryption_key) - if client_id is None or client_secret is None or tenant_id is None: + if cls.client_id is None or cls.client_secret is None or cls.tenant_id is None: raise ValueError( "NEO4J_CLIENT_ID, NEO4J_CLIENT_SECRET, and NEO4J_TENANT_ID environment variables must be set to use Neo4j Aura DatasetDatabase Handling." ) - # Make the request with HTTP Basic Auth - def get_aura_token(client_id: str, client_secret: str) -> dict: - url = "https://api.neo4j.io/oauth/token" - data = {"grant_type": "client_credentials"} # sent as application/x-www-form-urlencoded - - resp = requests.post(url, data=data, auth=(client_id, client_secret)) - resp.raise_for_status() # raises if the request failed - return resp.json() - - resp = get_aura_token(client_id, client_secret) + resp = await cls._get_aura_token(cls.client_id, cls.client_secret) url = "https://api.neo4j.io/v1/instances" @@ -92,7 +84,7 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): 0:29 ], # TODO: Find better name to name Neo4j instance within 30 character limit "type": "professional-db", - "tenant_id": tenant_id, + "tenant_id": cls.tenant_id, "cloud_provider": "gcp", } @@ -165,4 +157,31 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): @classmethod async def delete_dataset(cls, dataset_database: DatasetDatabase): - pass + # Get dataset database information and credentials + dataset_database = await cls.resolve_dataset_connection_info(dataset_database) + + parsed_url = urlparse(dataset_database.graph_database_url) + instance_id = parsed_url.hostname.split(".")[0] + + url = f"https://api.neo4j.io/v1/instances/{instance_id}" + + # Get access token for Neo4j Aura API + resp = await cls._get_aura_token(cls.client_id, cls.client_secret) + + headers = { + "accept": "application/json", + "Authorization": f"Bearer {resp['access_token']}", + "Content-Type": "application/json", + } + + response = requests.delete(url, headers=headers) + return response + + @classmethod + async def _get_aura_token(cls, client_id: str, client_secret: str) -> dict: + url = "https://api.neo4j.io/oauth/token" + data = {"grant_type": "client_credentials"} # sent as application/x-www-form-urlencoded + + resp = requests.post(url, data=data, auth=(client_id, client_secret)) + resp.raise_for_status() # raises if the request failed + return resp.json() From 05961221f4b6c3cff3d9858ffddf7cee27653cfa Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Fri, 9 Jan 2026 19:08:52 +0100 Subject: [PATCH 2/2] refactor: use async requests library --- .../Neo4jAuraDevDatasetDatabaseHandler.py | 82 +++++++++++-------- 1 file changed, 49 insertions(+), 33 deletions(-) diff --git a/cognee/infrastructure/databases/graph/neo4j_driver/Neo4jAuraDevDatasetDatabaseHandler.py b/cognee/infrastructure/databases/graph/neo4j_driver/Neo4jAuraDevDatasetDatabaseHandler.py index 568bea528..bccf5020e 100644 --- a/cognee/infrastructure/databases/graph/neo4j_driver/Neo4jAuraDevDatasetDatabaseHandler.py +++ b/cognee/infrastructure/databases/graph/neo4j_driver/Neo4jAuraDevDatasetDatabaseHandler.py @@ -1,13 +1,13 @@ import os import aiohttp import asyncio -import requests import base64 import hashlib from uuid import UUID from typing import Optional from urllib.parse import urlparse from cryptography.fernet import Fernet +from aiohttp import BasicAuth from cognee.infrastructure.databases.graph import get_graph_config from cognee.modules.users.models import User, DatasetDatabase @@ -25,16 +25,8 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): Quality of life improvements: - Allow configuration of different Neo4j Aura plans and regions. - - Requests should be made async, currently a blocking requests library is used. """ - # Client credentials and encryption - client_id = os.environ.get("NEO4J_CLIENT_ID", None) - client_secret = os.environ.get("NEO4J_CLIENT_SECRET", None) - tenant_id = os.environ.get("NEO4J_TENANT_ID", None) - encryption_env_key = os.environ.get("NEO4J_ENCRYPTION_KEY", "test_key") - encryption_key = base64.urlsafe_b64encode(hashlib.sha256(encryption_env_key.encode()).digest()) - @classmethod async def create_dataset(cls, dataset_id: Optional[UUID], user: Optional[User]) -> dict: """ @@ -57,20 +49,29 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): graph_db_name = f"{dataset_id}" - cipher = Fernet(cls.encryption_key) + # Client credentials and encryption + # Note: Should not be used as class variables so that they are not persisted in memory longer than needed + client_id = os.environ.get("NEO4J_CLIENT_ID", None) + client_secret = os.environ.get("NEO4J_CLIENT_SECRET", None) + tenant_id = os.environ.get("NEO4J_TENANT_ID", None) + encryption_env_key = os.environ.get("NEO4J_ENCRYPTION_KEY", "test_key") + encryption_key = base64.urlsafe_b64encode( + hashlib.sha256(encryption_env_key.encode()).digest() + ) + cipher = Fernet(encryption_key) - if cls.client_id is None or cls.client_secret is None or cls.tenant_id is None: + if client_id is None or client_secret is None or tenant_id is None: raise ValueError( "NEO4J_CLIENT_ID, NEO4J_CLIENT_SECRET, and NEO4J_TENANT_ID environment variables must be set to use Neo4j Aura DatasetDatabase Handling." ) - resp = await cls._get_aura_token(cls.client_id, cls.client_secret) + resp_token = await cls._get_aura_token(client_id, client_secret) url = "https://api.neo4j.io/v1/instances" headers = { "accept": "application/json", - "Authorization": f"Bearer {resp['access_token']}", + "Authorization": f"Bearer {resp_token['access_token']}", "Content-Type": "application/json", } @@ -84,35 +85,42 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): 0:29 ], # TODO: Find better name to name Neo4j instance within 30 character limit "type": "professional-db", - "tenant_id": cls.tenant_id, + "tenant_id": tenant_id, "cloud_provider": "gcp", } - response = requests.post(url, headers=headers, json=payload) + async def _create_database_instance_request(): + async with aiohttp.ClientSession() as session: + async with session.post(url, headers=headers, json=payload) as resp: + resp.raise_for_status() + return await resp.json() + + resp_create = await _create_database_instance_request() graph_db_name = "neo4j" # Has to be 'neo4j' for Aura - graph_db_url = response.json()["data"]["connection_url"] - graph_db_key = resp["access_token"] - graph_db_username = response.json()["data"]["username"] - graph_db_password = response.json()["data"]["password"] + graph_db_url = resp_create["data"]["connection_url"] + graph_db_key = resp_token["access_token"] + graph_db_username = resp_create["data"]["username"] + graph_db_password = resp_create["data"]["password"] async def _wait_for_neo4j_instance_provisioning(instance_id: str, headers: dict): # Poll until the instance is running status_url = f"https://api.neo4j.io/v1/instances/{instance_id}" status = "" for attempt in range(30): # Try for up to ~5 minutes - status_resp = requests.get( - status_url, headers=headers - ) # TODO: Use async requests with httpx - status = status_resp.json()["data"]["status"] - if status.lower() == "running": - return - await asyncio.sleep(10) + async with aiohttp.ClientSession() as session: + async with session.get(status_url, headers=headers) as resp: + resp.raise_for_status() + status_resp = await resp.json() + status = status_resp["data"]["status"] + if status.lower() == "running": + return + await asyncio.sleep(10) raise TimeoutError( f"Neo4j instance '{graph_db_name}' did not become ready within 5 minutes. Status: {status}" ) - instance_id = response.json()["data"]["id"] + instance_id = resp_create["data"]["id"] await _wait_for_neo4j_instance_provisioning(instance_id, headers) encrypted_db_password_bytes = cipher.encrypt(graph_db_password.encode()) @@ -166,7 +174,10 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): url = f"https://api.neo4j.io/v1/instances/{instance_id}" # Get access token for Neo4j Aura API - resp = await cls._get_aura_token(cls.client_id, cls.client_secret) + # Client credentials + client_id = os.environ.get("NEO4J_CLIENT_ID", None) + client_secret = os.environ.get("NEO4J_CLIENT_SECRET", None) + resp = await cls._get_aura_token(client_id, client_secret) headers = { "accept": "application/json", @@ -174,14 +185,19 @@ class Neo4jAuraDevDatasetDatabaseHandler(DatasetDatabaseHandlerInterface): "Content-Type": "application/json", } - response = requests.delete(url, headers=headers) - return response + async with aiohttp.ClientSession() as session: + async with session.delete(url, headers=headers) as resp: + resp.raise_for_status() + return await resp.json() @classmethod async def _get_aura_token(cls, client_id: str, client_secret: str) -> dict: url = "https://api.neo4j.io/oauth/token" data = {"grant_type": "client_credentials"} # sent as application/x-www-form-urlencoded - resp = requests.post(url, data=data, auth=(client_id, client_secret)) - resp.raise_for_status() # raises if the request failed - return resp.json() + async with aiohttp.ClientSession() as session: + async with session.post( + url, data=data, auth=BasicAuth(client_id, client_secret) + ) as resp: + resp.raise_for_status() + return await resp.json()