From 585ad96c47c0a8e1d7d76fd8ae5e7d504045ddc7 Mon Sep 17 00:00:00 2001 From: Pavan Chilukuri <23178099+chilupa@users.noreply.github.com> Date: Sat, 2 Aug 2025 13:52:55 -0500 Subject: [PATCH] format using ruff --- cognee/api/client.py | 21 +++----- cognee/api/health.py | 118 ++++++++++++++++++++++--------------------- 2 files changed, 66 insertions(+), 73 deletions(-) diff --git a/cognee/api/client.py b/cognee/api/client.py index 9275cee93..982b97556 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -185,16 +185,13 @@ async def readiness_check(): if health_status.status == HealthStatus.UNHEALTHY: return JSONResponse( status_code=503, - content={"status": "not ready", "reason": "critical services unhealthy"} + content={"status": "not ready", "reason": "critical services unhealthy"}, ) - return JSONResponse( - status_code=200, - content={"status": "ready"} - ) + return JSONResponse(status_code=200, content={"status": "ready"}) except Exception as e: return JSONResponse( status_code=503, - content={"status": "not ready", "reason": f"health check failed: {str(e)}"} + content={"status": "not ready", "reason": f"health check failed: {str(e)}"}, ) @@ -210,18 +207,12 @@ async def detailed_health_check(): status_code = 503 elif health_status.status == HealthStatus.DEGRADED: status_code = 200 # Degraded is still operational - - return JSONResponse( - status_code=status_code, - content=health_status.model_dump() - ) + + return JSONResponse(status_code=status_code, content=health_status.model_dump()) except Exception as e: return JSONResponse( status_code=503, - content={ - "status": "unhealthy", - "error": f"Health check system failure: {str(e)}" - } + content={"status": "unhealthy", "error": f"Health check system failure: {str(e)}"}, ) diff --git a/cognee/api/health.py b/cognee/api/health.py index b435da215..a8c9dc7a0 100644 --- a/cognee/api/health.py +++ b/cognee/api/health.py @@ -42,23 +42,25 @@ class HealthChecker: """Check relational database health.""" start_time = time.time() try: - from cognee.infrastructure.databases.relational.get_relational_engine import get_relational_engine + from cognee.infrastructure.databases.relational.get_relational_engine import ( + get_relational_engine, + ) from cognee.infrastructure.databases.relational.config import get_relational_config - + config = get_relational_config() engine = get_relational_engine() - + # Test connection by creating a session session = await engine.get_session() if session: await session.close() - + response_time = int((time.time() - start_time) * 1000) return ComponentHealth( status=HealthStatus.HEALTHY, provider=config.db_provider, response_time_ms=response_time, - details="Connection successful" + details="Connection successful", ) except Exception as e: response_time = int((time.time() - start_time) * 1000) @@ -66,7 +68,7 @@ class HealthChecker: status=HealthStatus.UNHEALTHY, provider="unknown", response_time_ms=response_time, - details=f"Connection failed: {str(e)}" + details=f"Connection failed: {str(e)}", ) async def check_vector_db(self) -> ComponentHealth: @@ -75,23 +77,23 @@ class HealthChecker: try: from cognee.infrastructure.databases.vector.get_vector_engine import get_vector_engine from cognee.infrastructure.databases.vector.config import get_vectordb_config - + config = get_vectordb_config() engine = get_vector_engine() - + # Test basic operation - just check if engine is accessible - if hasattr(engine, 'health_check'): + if hasattr(engine, "health_check"): await engine.health_check() - elif hasattr(engine, 'list_tables'): + elif hasattr(engine, "list_tables"): # For LanceDB and similar engine.list_tables() - + response_time = int((time.time() - start_time) * 1000) return ComponentHealth( status=HealthStatus.HEALTHY, provider=config.vector_db_provider, response_time_ms=response_time, - details="Index accessible" + details="Index accessible", ) except Exception as e: response_time = int((time.time() - start_time) * 1000) @@ -99,7 +101,7 @@ class HealthChecker: status=HealthStatus.UNHEALTHY, provider="unknown", response_time_ms=response_time, - details=f"Connection failed: {str(e)}" + details=f"Connection failed: {str(e)}", ) async def check_graph_db(self) -> ComponentHealth: @@ -108,23 +110,23 @@ class HealthChecker: try: from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine from cognee.infrastructure.databases.graph.config import get_graph_config - + config = get_graph_config() engine = await get_graph_engine() - + # Test basic operation - just check if engine is accessible - if hasattr(engine, 'health_check'): + if hasattr(engine, "health_check"): await engine.health_check() - elif hasattr(engine, 'get_nodes'): + elif hasattr(engine, "get_nodes"): # Basic connectivity test pass - + response_time = int((time.time() - start_time) * 1000) return ComponentHealth( status=HealthStatus.HEALTHY, provider=config.graph_database_provider, response_time_ms=response_time, - details="Schema validated" + details="Schema validated", ) except Exception as e: response_time = int((time.time() - start_time) * 1000) @@ -132,7 +134,7 @@ class HealthChecker: status=HealthStatus.UNHEALTHY, provider="unknown", response_time_ms=response_time, - details=f"Connection failed: {str(e)}" + details=f"Connection failed: {str(e)}", ) async def check_file_storage(self) -> ComponentHealth: @@ -142,19 +144,19 @@ class HealthChecker: import os from cognee.infrastructure.files.storage.get_file_storage import get_file_storage from cognee.base_config import get_base_config - + base_config = get_base_config() storage = get_file_storage(base_config.data_root_directory) - + # Determine provider provider = "s3" if base_config.data_root_directory.startswith("s3://") else "local" - + # Test storage accessibility - for local storage, just check directory exists if provider == "local": os.makedirs(base_config.data_root_directory, exist_ok=True) # Simple write/read test test_file = os.path.join(base_config.data_root_directory, "health_check_test") - with open(test_file, 'w') as f: + with open(test_file, "w") as f: f.write("test") os.remove(test_file) else: @@ -162,13 +164,13 @@ class HealthChecker: test_path = "health_check_test" await storage.store(test_path, b"test") await storage.delete(test_path) - + response_time = int((time.time() - start_time) * 1000) return ComponentHealth( status=HealthStatus.HEALTHY, provider=provider, response_time_ms=response_time, - details="Storage accessible" + details="Storage accessible", ) except Exception as e: response_time = int((time.time() - start_time) * 1000) @@ -176,7 +178,7 @@ class HealthChecker: status=HealthStatus.UNHEALTHY, provider="unknown", response_time_ms=response_time, - details=f"Storage test failed: {str(e)}" + details=f"Storage test failed: {str(e)}", ) async def check_llm_provider(self) -> ComponentHealth: @@ -185,9 +187,9 @@ class HealthChecker: try: from cognee.infrastructure.llm.get_llm_client import get_llm_client from cognee.infrastructure.llm.config import get_llm_config - + config = get_llm_config() - + # Simple configuration check - don't actually call the API if config.llm_api_key or config.llm_provider == "ollama": status = HealthStatus.HEALTHY @@ -195,13 +197,13 @@ class HealthChecker: else: status = HealthStatus.DEGRADED details = "No API key configured" - + response_time = int((time.time() - start_time) * 1000) return ComponentHealth( status=status, provider=config.llm_provider, response_time_ms=response_time, - details=details + details=details, ) except Exception as e: response_time = int((time.time() - start_time) * 1000) @@ -209,24 +211,26 @@ class HealthChecker: status=HealthStatus.DEGRADED, provider="unknown", response_time_ms=response_time, - details=f"Config check failed: {str(e)}" + details=f"Config check failed: {str(e)}", ) async def check_embedding_service(self) -> ComponentHealth: """Check embedding service health (non-critical).""" start_time = time.time() try: - from cognee.infrastructure.databases.vector.embeddings.get_embedding_engine import get_embedding_engine - + from cognee.infrastructure.databases.vector.embeddings.get_embedding_engine import ( + get_embedding_engine, + ) + # Just check if we can get the engine without calling it - engine = get_embedding_engine() - + get_embedding_engine() + response_time = int((time.time() - start_time) * 1000) return ComponentHealth( status=HealthStatus.HEALTHY, provider="configured", response_time_ms=response_time, - details="Embedding engine accessible" + details="Embedding engine accessible", ) except Exception as e: response_time = int((time.time() - start_time) * 1000) @@ -234,13 +238,13 @@ class HealthChecker: status=HealthStatus.DEGRADED, provider="unknown", response_time_ms=response_time, - details=f"Embedding engine failed: {str(e)}" + details=f"Embedding engine failed: {str(e)}", ) async def get_health_status(self, detailed: bool = False) -> HealthResponse: """Get comprehensive health status.""" components = {} - + # Critical services critical_checks = [ ("relational_db", self.check_relational_db()), @@ -248,72 +252,70 @@ class HealthChecker: ("graph_db", self.check_graph_db()), ("file_storage", self.check_file_storage()), ] - + # Non-critical services (only for detailed checks) non_critical_checks = [ ("llm_provider", self.check_llm_provider()), ("embedding_service", self.check_embedding_service()), ] - + # Run critical checks critical_results = await asyncio.gather( - *[check for _, check in critical_checks], - return_exceptions=True + *[check for _, check in critical_checks], return_exceptions=True ) - + for (name, _), result in zip(critical_checks, critical_results): if isinstance(result, Exception): components[name] = ComponentHealth( status=HealthStatus.UNHEALTHY, provider="unknown", response_time_ms=0, - details=f"Health check failed: {str(result)}" + details=f"Health check failed: {str(result)}", ) else: components[name] = result - + # Run non-critical checks if detailed if detailed: non_critical_results = await asyncio.gather( - *[check for _, check in non_critical_checks], - return_exceptions=True + *[check for _, check in non_critical_checks], return_exceptions=True ) - + for (name, _), result in zip(non_critical_checks, non_critical_results): if isinstance(result, Exception): components[name] = ComponentHealth( status=HealthStatus.DEGRADED, provider="unknown", response_time_ms=0, - details=f"Health check failed: {str(result)}" + details=f"Health check failed: {str(result)}", ) else: components[name] = result - + # Determine overall status critical_unhealthy = any( - comp.status == HealthStatus.UNHEALTHY - for name, comp in components.items() + comp.status == HealthStatus.UNHEALTHY + for name, comp in components.items() if name in ["relational_db", "vector_db", "graph_db", "file_storage"] ) - + has_degraded = any(comp.status == HealthStatus.DEGRADED for comp in components.values()) - + if critical_unhealthy: overall_status = HealthStatus.UNHEALTHY elif has_degraded: overall_status = HealthStatus.DEGRADED else: overall_status = HealthStatus.HEALTHY - + return HealthResponse( status=overall_status, timestamp=datetime.now(timezone.utc).isoformat(), version=get_cognee_version(), uptime=int(time.time() - self.start_time), - components=components + components=components, ) # Global health checker instance -health_checker = HealthChecker() \ No newline at end of file +health_checker = HealthChecker()