format using ruff

This commit is contained in:
Pavan Chilukuri 2025-08-02 13:52:55 -05:00
parent be33be7438
commit 585ad96c47
2 changed files with 66 additions and 73 deletions

View file

@ -185,16 +185,13 @@ async def readiness_check():
if health_status.status == HealthStatus.UNHEALTHY: if health_status.status == HealthStatus.UNHEALTHY:
return JSONResponse( return JSONResponse(
status_code=503, status_code=503,
content={"status": "not ready", "reason": "critical services unhealthy"} content={"status": "not ready", "reason": "critical services unhealthy"},
) )
return JSONResponse( return JSONResponse(status_code=200, content={"status": "ready"})
status_code=200,
content={"status": "ready"}
)
except Exception as e: except Exception as e:
return JSONResponse( return JSONResponse(
status_code=503, status_code=503,
content={"status": "not ready", "reason": f"health check failed: {str(e)}"} content={"status": "not ready", "reason": f"health check failed: {str(e)}"},
) )
@ -210,18 +207,12 @@ async def detailed_health_check():
status_code = 503 status_code = 503
elif health_status.status == HealthStatus.DEGRADED: elif health_status.status == HealthStatus.DEGRADED:
status_code = 200 # Degraded is still operational status_code = 200 # Degraded is still operational
return JSONResponse( return JSONResponse(status_code=status_code, content=health_status.model_dump())
status_code=status_code,
content=health_status.model_dump()
)
except Exception as e: except Exception as e:
return JSONResponse( return JSONResponse(
status_code=503, status_code=503,
content={ content={"status": "unhealthy", "error": f"Health check system failure: {str(e)}"},
"status": "unhealthy",
"error": f"Health check system failure: {str(e)}"
}
) )

View file

@ -42,23 +42,25 @@ class HealthChecker:
"""Check relational database health.""" """Check relational database health."""
start_time = time.time() start_time = time.time()
try: try:
from cognee.infrastructure.databases.relational.get_relational_engine import get_relational_engine from cognee.infrastructure.databases.relational.get_relational_engine import (
get_relational_engine,
)
from cognee.infrastructure.databases.relational.config import get_relational_config from cognee.infrastructure.databases.relational.config import get_relational_config
config = get_relational_config() config = get_relational_config()
engine = get_relational_engine() engine = get_relational_engine()
# Test connection by creating a session # Test connection by creating a session
session = await engine.get_session() session = await engine.get_session()
if session: if session:
await session.close() await session.close()
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
return ComponentHealth( return ComponentHealth(
status=HealthStatus.HEALTHY, status=HealthStatus.HEALTHY,
provider=config.db_provider, provider=config.db_provider,
response_time_ms=response_time, response_time_ms=response_time,
details="Connection successful" details="Connection successful",
) )
except Exception as e: except Exception as e:
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
@ -66,7 +68,7 @@ class HealthChecker:
status=HealthStatus.UNHEALTHY, status=HealthStatus.UNHEALTHY,
provider="unknown", provider="unknown",
response_time_ms=response_time, response_time_ms=response_time,
details=f"Connection failed: {str(e)}" details=f"Connection failed: {str(e)}",
) )
async def check_vector_db(self) -> ComponentHealth: async def check_vector_db(self) -> ComponentHealth:
@ -75,23 +77,23 @@ class HealthChecker:
try: try:
from cognee.infrastructure.databases.vector.get_vector_engine import get_vector_engine from cognee.infrastructure.databases.vector.get_vector_engine import get_vector_engine
from cognee.infrastructure.databases.vector.config import get_vectordb_config from cognee.infrastructure.databases.vector.config import get_vectordb_config
config = get_vectordb_config() config = get_vectordb_config()
engine = get_vector_engine() engine = get_vector_engine()
# Test basic operation - just check if engine is accessible # Test basic operation - just check if engine is accessible
if hasattr(engine, 'health_check'): if hasattr(engine, "health_check"):
await engine.health_check() await engine.health_check()
elif hasattr(engine, 'list_tables'): elif hasattr(engine, "list_tables"):
# For LanceDB and similar # For LanceDB and similar
engine.list_tables() engine.list_tables()
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
return ComponentHealth( return ComponentHealth(
status=HealthStatus.HEALTHY, status=HealthStatus.HEALTHY,
provider=config.vector_db_provider, provider=config.vector_db_provider,
response_time_ms=response_time, response_time_ms=response_time,
details="Index accessible" details="Index accessible",
) )
except Exception as e: except Exception as e:
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
@ -99,7 +101,7 @@ class HealthChecker:
status=HealthStatus.UNHEALTHY, status=HealthStatus.UNHEALTHY,
provider="unknown", provider="unknown",
response_time_ms=response_time, response_time_ms=response_time,
details=f"Connection failed: {str(e)}" details=f"Connection failed: {str(e)}",
) )
async def check_graph_db(self) -> ComponentHealth: async def check_graph_db(self) -> ComponentHealth:
@ -108,23 +110,23 @@ class HealthChecker:
try: try:
from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine from cognee.infrastructure.databases.graph.get_graph_engine import get_graph_engine
from cognee.infrastructure.databases.graph.config import get_graph_config from cognee.infrastructure.databases.graph.config import get_graph_config
config = get_graph_config() config = get_graph_config()
engine = await get_graph_engine() engine = await get_graph_engine()
# Test basic operation - just check if engine is accessible # Test basic operation - just check if engine is accessible
if hasattr(engine, 'health_check'): if hasattr(engine, "health_check"):
await engine.health_check() await engine.health_check()
elif hasattr(engine, 'get_nodes'): elif hasattr(engine, "get_nodes"):
# Basic connectivity test # Basic connectivity test
pass pass
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
return ComponentHealth( return ComponentHealth(
status=HealthStatus.HEALTHY, status=HealthStatus.HEALTHY,
provider=config.graph_database_provider, provider=config.graph_database_provider,
response_time_ms=response_time, response_time_ms=response_time,
details="Schema validated" details="Schema validated",
) )
except Exception as e: except Exception as e:
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
@ -132,7 +134,7 @@ class HealthChecker:
status=HealthStatus.UNHEALTHY, status=HealthStatus.UNHEALTHY,
provider="unknown", provider="unknown",
response_time_ms=response_time, response_time_ms=response_time,
details=f"Connection failed: {str(e)}" details=f"Connection failed: {str(e)}",
) )
async def check_file_storage(self) -> ComponentHealth: async def check_file_storage(self) -> ComponentHealth:
@ -142,19 +144,19 @@ class HealthChecker:
import os import os
from cognee.infrastructure.files.storage.get_file_storage import get_file_storage from cognee.infrastructure.files.storage.get_file_storage import get_file_storage
from cognee.base_config import get_base_config from cognee.base_config import get_base_config
base_config = get_base_config() base_config = get_base_config()
storage = get_file_storage(base_config.data_root_directory) storage = get_file_storage(base_config.data_root_directory)
# Determine provider # Determine provider
provider = "s3" if base_config.data_root_directory.startswith("s3://") else "local" provider = "s3" if base_config.data_root_directory.startswith("s3://") else "local"
# Test storage accessibility - for local storage, just check directory exists # Test storage accessibility - for local storage, just check directory exists
if provider == "local": if provider == "local":
os.makedirs(base_config.data_root_directory, exist_ok=True) os.makedirs(base_config.data_root_directory, exist_ok=True)
# Simple write/read test # Simple write/read test
test_file = os.path.join(base_config.data_root_directory, "health_check_test") test_file = os.path.join(base_config.data_root_directory, "health_check_test")
with open(test_file, 'w') as f: with open(test_file, "w") as f:
f.write("test") f.write("test")
os.remove(test_file) os.remove(test_file)
else: else:
@ -162,13 +164,13 @@ class HealthChecker:
test_path = "health_check_test" test_path = "health_check_test"
await storage.store(test_path, b"test") await storage.store(test_path, b"test")
await storage.delete(test_path) await storage.delete(test_path)
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
return ComponentHealth( return ComponentHealth(
status=HealthStatus.HEALTHY, status=HealthStatus.HEALTHY,
provider=provider, provider=provider,
response_time_ms=response_time, response_time_ms=response_time,
details="Storage accessible" details="Storage accessible",
) )
except Exception as e: except Exception as e:
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
@ -176,7 +178,7 @@ class HealthChecker:
status=HealthStatus.UNHEALTHY, status=HealthStatus.UNHEALTHY,
provider="unknown", provider="unknown",
response_time_ms=response_time, response_time_ms=response_time,
details=f"Storage test failed: {str(e)}" details=f"Storage test failed: {str(e)}",
) )
async def check_llm_provider(self) -> ComponentHealth: async def check_llm_provider(self) -> ComponentHealth:
@ -185,9 +187,9 @@ class HealthChecker:
try: try:
from cognee.infrastructure.llm.get_llm_client import get_llm_client from cognee.infrastructure.llm.get_llm_client import get_llm_client
from cognee.infrastructure.llm.config import get_llm_config from cognee.infrastructure.llm.config import get_llm_config
config = get_llm_config() config = get_llm_config()
# Simple configuration check - don't actually call the API # Simple configuration check - don't actually call the API
if config.llm_api_key or config.llm_provider == "ollama": if config.llm_api_key or config.llm_provider == "ollama":
status = HealthStatus.HEALTHY status = HealthStatus.HEALTHY
@ -195,13 +197,13 @@ class HealthChecker:
else: else:
status = HealthStatus.DEGRADED status = HealthStatus.DEGRADED
details = "No API key configured" details = "No API key configured"
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
return ComponentHealth( return ComponentHealth(
status=status, status=status,
provider=config.llm_provider, provider=config.llm_provider,
response_time_ms=response_time, response_time_ms=response_time,
details=details details=details,
) )
except Exception as e: except Exception as e:
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
@ -209,24 +211,26 @@ class HealthChecker:
status=HealthStatus.DEGRADED, status=HealthStatus.DEGRADED,
provider="unknown", provider="unknown",
response_time_ms=response_time, response_time_ms=response_time,
details=f"Config check failed: {str(e)}" details=f"Config check failed: {str(e)}",
) )
async def check_embedding_service(self) -> ComponentHealth: async def check_embedding_service(self) -> ComponentHealth:
"""Check embedding service health (non-critical).""" """Check embedding service health (non-critical)."""
start_time = time.time() start_time = time.time()
try: try:
from cognee.infrastructure.databases.vector.embeddings.get_embedding_engine import get_embedding_engine from cognee.infrastructure.databases.vector.embeddings.get_embedding_engine import (
get_embedding_engine,
)
# Just check if we can get the engine without calling it # Just check if we can get the engine without calling it
engine = get_embedding_engine() get_embedding_engine()
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
return ComponentHealth( return ComponentHealth(
status=HealthStatus.HEALTHY, status=HealthStatus.HEALTHY,
provider="configured", provider="configured",
response_time_ms=response_time, response_time_ms=response_time,
details="Embedding engine accessible" details="Embedding engine accessible",
) )
except Exception as e: except Exception as e:
response_time = int((time.time() - start_time) * 1000) response_time = int((time.time() - start_time) * 1000)
@ -234,13 +238,13 @@ class HealthChecker:
status=HealthStatus.DEGRADED, status=HealthStatus.DEGRADED,
provider="unknown", provider="unknown",
response_time_ms=response_time, response_time_ms=response_time,
details=f"Embedding engine failed: {str(e)}" details=f"Embedding engine failed: {str(e)}",
) )
async def get_health_status(self, detailed: bool = False) -> HealthResponse: async def get_health_status(self, detailed: bool = False) -> HealthResponse:
"""Get comprehensive health status.""" """Get comprehensive health status."""
components = {} components = {}
# Critical services # Critical services
critical_checks = [ critical_checks = [
("relational_db", self.check_relational_db()), ("relational_db", self.check_relational_db()),
@ -248,72 +252,70 @@ class HealthChecker:
("graph_db", self.check_graph_db()), ("graph_db", self.check_graph_db()),
("file_storage", self.check_file_storage()), ("file_storage", self.check_file_storage()),
] ]
# Non-critical services (only for detailed checks) # Non-critical services (only for detailed checks)
non_critical_checks = [ non_critical_checks = [
("llm_provider", self.check_llm_provider()), ("llm_provider", self.check_llm_provider()),
("embedding_service", self.check_embedding_service()), ("embedding_service", self.check_embedding_service()),
] ]
# Run critical checks # Run critical checks
critical_results = await asyncio.gather( critical_results = await asyncio.gather(
*[check for _, check in critical_checks], *[check for _, check in critical_checks], return_exceptions=True
return_exceptions=True
) )
for (name, _), result in zip(critical_checks, critical_results): for (name, _), result in zip(critical_checks, critical_results):
if isinstance(result, Exception): if isinstance(result, Exception):
components[name] = ComponentHealth( components[name] = ComponentHealth(
status=HealthStatus.UNHEALTHY, status=HealthStatus.UNHEALTHY,
provider="unknown", provider="unknown",
response_time_ms=0, response_time_ms=0,
details=f"Health check failed: {str(result)}" details=f"Health check failed: {str(result)}",
) )
else: else:
components[name] = result components[name] = result
# Run non-critical checks if detailed # Run non-critical checks if detailed
if detailed: if detailed:
non_critical_results = await asyncio.gather( non_critical_results = await asyncio.gather(
*[check for _, check in non_critical_checks], *[check for _, check in non_critical_checks], return_exceptions=True
return_exceptions=True
) )
for (name, _), result in zip(non_critical_checks, non_critical_results): for (name, _), result in zip(non_critical_checks, non_critical_results):
if isinstance(result, Exception): if isinstance(result, Exception):
components[name] = ComponentHealth( components[name] = ComponentHealth(
status=HealthStatus.DEGRADED, status=HealthStatus.DEGRADED,
provider="unknown", provider="unknown",
response_time_ms=0, response_time_ms=0,
details=f"Health check failed: {str(result)}" details=f"Health check failed: {str(result)}",
) )
else: else:
components[name] = result components[name] = result
# Determine overall status # Determine overall status
critical_unhealthy = any( critical_unhealthy = any(
comp.status == HealthStatus.UNHEALTHY comp.status == HealthStatus.UNHEALTHY
for name, comp in components.items() for name, comp in components.items()
if name in ["relational_db", "vector_db", "graph_db", "file_storage"] if name in ["relational_db", "vector_db", "graph_db", "file_storage"]
) )
has_degraded = any(comp.status == HealthStatus.DEGRADED for comp in components.values()) has_degraded = any(comp.status == HealthStatus.DEGRADED for comp in components.values())
if critical_unhealthy: if critical_unhealthy:
overall_status = HealthStatus.UNHEALTHY overall_status = HealthStatus.UNHEALTHY
elif has_degraded: elif has_degraded:
overall_status = HealthStatus.DEGRADED overall_status = HealthStatus.DEGRADED
else: else:
overall_status = HealthStatus.HEALTHY overall_status = HealthStatus.HEALTHY
return HealthResponse( return HealthResponse(
status=overall_status, status=overall_status,
timestamp=datetime.now(timezone.utc).isoformat(), timestamp=datetime.now(timezone.utc).isoformat(),
version=get_cognee_version(), version=get_cognee_version(),
uptime=int(time.time() - self.start_time), uptime=int(time.time() - self.start_time),
components=components components=components,
) )
# Global health checker instance # Global health checker instance
health_checker = HealthChecker() health_checker = HealthChecker()