mypy: fix RemoteKuzuAdapter mypy errors

This commit is contained in:
Daulet Amirkhanov 2025-09-04 15:18:37 +01:00
parent 1b221148c3
commit a4c48d1104

View file

@ -2,7 +2,7 @@
from cognee.shared.logging_utils import get_logger from cognee.shared.logging_utils import get_logger
import json import json
from typing import Dict, Any, List, Optional, Tuple from typing import Dict, Any, List, Optional, Tuple, Union
import aiohttp import aiohttp
from uuid import UUID from uuid import UUID
@ -14,7 +14,7 @@ logger = get_logger()
class UUIDEncoder(json.JSONEncoder): class UUIDEncoder(json.JSONEncoder):
"""Custom JSON encoder that handles UUID objects.""" """Custom JSON encoder that handles UUID objects."""
def default(self, obj): def default(self, obj: Union[UUID, Any]) -> Any:
if isinstance(obj, UUID): if isinstance(obj, UUID):
return str(obj) return str(obj)
return super().default(obj) return super().default(obj)
@ -36,7 +36,7 @@ class RemoteKuzuAdapter(KuzuAdapter):
self.api_url = api_url self.api_url = api_url
self.username = username self.username = username
self.password = password self.password = password
self._session = None self._session: Optional[aiohttp.ClientSession] = None
self._schema_initialized = False self._schema_initialized = False
async def _get_session(self) -> aiohttp.ClientSession: async def _get_session(self) -> aiohttp.ClientSession:
@ -45,13 +45,13 @@ class RemoteKuzuAdapter(KuzuAdapter):
self._session = aiohttp.ClientSession() self._session = aiohttp.ClientSession()
return self._session return self._session
async def close(self): async def close(self) -> None:
"""Close the adapter and its session.""" """Close the adapter and its session."""
if self._session and not self._session.closed: if self._session and not self._session.closed:
await self._session.close() await self._session.close()
self._session = None self._session = None
async def _make_request(self, endpoint: str, data: dict) -> dict: async def _make_request(self, endpoint: str, data: dict[str, Any]) -> dict[str, Any]:
"""Make a request to the Kuzu API.""" """Make a request to the Kuzu API."""
url = f"{self.api_url}{endpoint}" url = f"{self.api_url}{endpoint}"
session = await self._get_session() session = await self._get_session()
@ -73,13 +73,13 @@ class RemoteKuzuAdapter(KuzuAdapter):
status=response.status, status=response.status,
message=error_detail, message=error_detail,
) )
return await response.json() return await response.json() # type: ignore
except aiohttp.ClientError as e: except aiohttp.ClientError as e:
logger.error(f"API request failed: {str(e)}") logger.error(f"API request failed: {str(e)}")
logger.error(f"Request data: {data}") logger.error(f"Request data: {data}")
raise raise
async def query(self, query: str, params: Optional[dict] = None) -> List[Tuple]: async def query(self, query: str, params: Optional[dict[str, Any]] = None) -> List[Tuple[Any, ...]]:
"""Execute a Kuzu query via the REST API.""" """Execute a Kuzu query via the REST API."""
try: try:
# Initialize schema if needed # Initialize schema if needed
@ -126,7 +126,7 @@ class RemoteKuzuAdapter(KuzuAdapter):
logger.error(f"Failed to check schema: {e}") logger.error(f"Failed to check schema: {e}")
return False return False
async def _create_schema(self): async def _create_schema(self) -> None:
"""Create the required schema tables.""" """Create the required schema tables."""
try: try:
# Create Node table if it doesn't exist # Create Node table if it doesn't exist
@ -180,7 +180,7 @@ class RemoteKuzuAdapter(KuzuAdapter):
logger.error(f"Failed to create schema: {e}") logger.error(f"Failed to create schema: {e}")
raise raise
async def _initialize_schema(self): async def _initialize_schema(self) -> None:
"""Initialize the database schema if it doesn't exist.""" """Initialize the database schema if it doesn't exist."""
if self._schema_initialized: if self._schema_initialized:
return return