new error system

This commit is contained in:
vasilije 2025-07-19 22:44:26 +02:00
parent dc03a52541
commit 19e22d14b8
8 changed files with 1106 additions and 137 deletions

View file

@ -15,6 +15,7 @@ from fastapi.exceptions import RequestValidationError
from fastapi.openapi.utils import get_openapi
from cognee.exceptions import CogneeApiError
from cognee.exceptions.enhanced_exceptions import CogneeBaseError
from cognee.shared.logging_utils import get_logger, setup_logging
from cognee.api.v1.permissions.routers import get_permissions_router
from cognee.api.v1.settings.routers import get_settings_router
@ -120,8 +121,27 @@ async def request_validation_exception_handler(request: Request, exc: RequestVal
)
@app.exception_handler(CogneeBaseError)
async def enhanced_exception_handler(_: Request, exc: CogneeBaseError) -> JSONResponse:
"""
Enhanced exception handler for the new exception hierarchy.
Provides standardized error responses with rich context and user guidance.
"""
# Log the full stack trace for debugging
logger.error(f"Enhanced exception caught: {exc.__class__.__name__}", exc_info=True)
# Create standardized error response
error_response = {"error": exc.to_dict()}
return JSONResponse(status_code=exc.status_code, content=error_response)
@app.exception_handler(CogneeApiError)
async def exception_handler(_: Request, exc: CogneeApiError) -> JSONResponse:
async def legacy_exception_handler(_: Request, exc: CogneeApiError) -> JSONResponse:
"""
Legacy exception handler for backward compatibility.
Handles old CogneeApiError instances with fallback formatting.
"""
detail = {}
if exc.name and exc.message and exc.status_code:
@ -136,7 +156,54 @@ async def exception_handler(_: Request, exc: CogneeApiError) -> JSONResponse:
# log the stack trace for easier serverside debugging
logger.error(format_exc())
return JSONResponse(status_code=status_code, content={"detail": detail["message"]})
# Convert to new format for consistency
error_response = {
"error": {
"type": exc.__class__.__name__,
"message": detail["message"],
"technical_message": detail["message"],
"suggestions": [
"Check the logs for more details",
"Try again or contact support if the issue persists",
],
"docs_link": "https://docs.cognee.ai/troubleshooting",
"is_retryable": False,
"context": {},
"operation": None,
}
}
return JSONResponse(status_code=status_code, content=error_response)
@app.exception_handler(Exception)
async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""
Global exception handler for any unhandled exceptions.
Ensures all errors return a consistent format.
"""
logger.error(f"Unhandled exception in {request.url.path}: {str(exc)}", exc_info=True)
# Create a standardized error response for unexpected errors
error_response = {
"error": {
"type": "UnexpectedError",
"message": "An unexpected error occurred. Please try again.",
"technical_message": str(exc) if app_environment != "prod" else "Internal server error",
"suggestions": [
"Try your request again",
"Check if the issue persists",
"Contact support if the problem continues",
],
"docs_link": "https://docs.cognee.ai/troubleshooting",
"is_retryable": True,
"context": {"path": str(request.url.path), "method": request.method},
"operation": None,
}
}
return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=error_response)
@app.get("/")

View file

@ -11,6 +11,13 @@ import requests
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.exceptions import (
UnsupportedFileFormatError,
FileAccessError,
DatasetNotFoundError,
CogneeValidationError,
CogneeSystemError,
)
logger = get_logger()
@ -49,49 +56,143 @@ def get_add_router() -> APIRouter:
- Any relevant metadata from the ingestion process
## Error Codes
- **400 Bad Request**: Neither datasetId nor datasetName provided
- **409 Conflict**: Error during add operation
- **400 Bad Request**: Missing required parameters or invalid input
- **422 Unprocessable Entity**: Unsupported file format or validation error
- **403 Forbidden**: User doesn't have permission to add to dataset
- **500 Internal Server Error**: System error during processing
## Notes
- To add data to datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True)
- GitHub repositories are cloned and all files are processed
- HTTP URLs are fetched and their content is processed
- The ALLOW_HTTP_REQUESTS environment variable controls URL processing
- Enhanced error messages provide specific guidance for fixing issues
"""
from cognee.api.v1.add import add as cognee_add
# Input validation with enhanced exceptions
if not datasetId and not datasetName:
raise ValueError("Either datasetId or datasetName must be provided.")
raise CogneeValidationError(
message="Either datasetId or datasetName must be provided",
user_message="You must specify either a dataset name or dataset ID.",
suggestions=[
"Provide a datasetName parameter (e.g., 'my_dataset')",
"Provide a datasetId parameter with a valid UUID",
"Check the API documentation for parameter examples",
],
docs_link="https://docs.cognee.ai/api/add",
context={"provided_dataset_name": datasetName, "provided_dataset_id": datasetId},
operation="add",
)
try:
if (
isinstance(data, str)
and data.startswith("http")
and (os.getenv("ALLOW_HTTP_REQUESTS", "true").lower() == "true")
):
if "github" in data:
if not data or len(data) == 0:
raise CogneeValidationError(
message="No data provided for upload",
user_message="You must provide data to add to the dataset.",
suggestions=[
"Upload one or more files",
"Provide a valid URL (if URL processing is enabled)",
"Check that your request includes the data parameter",
],
docs_link="https://docs.cognee.ai/guides/adding-data",
operation="add",
)
logger.info(
f"Adding {len(data)} items to dataset",
extra={
"dataset_name": datasetName,
"dataset_id": datasetId,
"user_id": user.id,
"item_count": len(data),
},
)
# Handle URL-based data (GitHub repos, HTTP URLs)
if (
len(data) == 1
and hasattr(data[0], "filename")
and isinstance(data[0].filename, str)
and data[0].filename.startswith("http")
and (os.getenv("ALLOW_HTTP_REQUESTS", "true").lower() == "true")
):
url = data[0].filename
if "github" in url:
try:
# Perform git clone if the URL is from GitHub
repo_name = data.split("/")[-1].replace(".git", "")
subprocess.run(["git", "clone", data, f".data/{repo_name}"], check=True)
repo_name = url.split("/")[-1].replace(".git", "")
subprocess.run(["git", "clone", url, f".data/{repo_name}"], check=True)
# TODO: Update add call with dataset info
await cognee_add(
result = await cognee_add(
"data://.data/",
f"{repo_name}",
)
else:
# Fetch and store the data from other types of URL using curl
response = requests.get(data)
except subprocess.CalledProcessError as e:
raise CogneeSystemError(
message=f"Failed to clone GitHub repository: {e}",
user_message=f"Could not clone the GitHub repository '{url}'.",
suggestions=[
"Check if the repository URL is correct",
"Verify the repository is public or you have access",
"Try cloning the repository manually to test access",
],
context={"url": url, "repo_name": repo_name, "error": str(e)},
operation="add",
)
else:
try:
# Fetch and store the data from other types of URL
response = requests.get(url, timeout=30)
response.raise_for_status()
file_data = await response.content()
file_data = response.content
# TODO: Update add call with dataset info
return await cognee_add(file_data)
else:
add_run = await cognee_add(data, datasetName, user=user, dataset_id=datasetId)
result = await cognee_add(file_data)
except requests.RequestException as e:
raise CogneeSystemError(
message=f"Failed to fetch URL: {e}",
user_message=f"Could not fetch content from '{url}'.",
suggestions=[
"Check if the URL is accessible",
"Verify your internet connection",
"Try accessing the URL in a browser",
"Check if the URL requires authentication",
],
context={"url": url, "error": str(e)},
operation="add",
)
else:
# Handle regular file uploads
# Validate file types before processing
supported_extensions = [
".txt",
".pdf",
".docx",
".md",
".csv",
".json",
".py",
".js",
".ts",
]
return add_run.model_dump()
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})
for file in data:
if file.filename:
file_ext = os.path.splitext(file.filename)[1].lower()
if file_ext and file_ext not in supported_extensions:
raise UnsupportedFileFormatError(
file_path=file.filename, supported_formats=supported_extensions
)
# Process the files
result = await cognee_add(data, datasetName, user=user, dataset_id=datasetId)
logger.info(
"Successfully added data to dataset",
extra={"dataset_name": datasetName, "dataset_id": datasetId, "user_id": user.id},
)
return result.model_dump() if hasattr(result, "model_dump") else result
return router

View file

@ -24,6 +24,13 @@ from cognee.modules.pipelines.queues.pipeline_run_info_queues import (
remove_queue,
)
from cognee.shared.logging_utils import get_logger
from cognee.exceptions import (
CogneeValidationError,
EmptyDatasetError,
DatasetNotFoundError,
MissingAPIKeyError,
NoDataToProcessError,
)
logger = get_logger("api.cognify")
@ -66,8 +73,10 @@ def get_cognify_router() -> APIRouter:
- **Background execution**: Pipeline run metadata including pipeline_run_id for status monitoring via WebSocket subscription
## Error Codes
- **400 Bad Request**: When neither datasets nor dataset_ids are provided, or when specified datasets don't exist
- **409 Conflict**: When processing fails due to system errors, missing LLM API keys, database connection failures, or corrupted content
- **400 Bad Request**: Missing required parameters or invalid input
- **422 Unprocessable Entity**: No data to process or validation errors
- **404 Not Found**: Specified datasets don't exist
- **500 Internal Server Error**: System errors, missing API keys, database connection failures
## Example Request
```json
@ -84,23 +93,53 @@ def get_cognify_router() -> APIRouter:
## Next Steps
After successful processing, use the search endpoints to query the generated knowledge graph for insights, relationships, and semantic search.
"""
# Input validation with enhanced exceptions
if not payload.datasets and not payload.dataset_ids:
return JSONResponse(
status_code=400, content={"error": "No datasets or dataset_ids provided"}
raise CogneeValidationError(
message="No datasets or dataset_ids provided",
user_message="You must specify which datasets to process.",
suggestions=[
"Provide dataset names using the 'datasets' parameter",
"Provide dataset UUIDs using the 'dataset_ids' parameter",
"Use cognee.datasets() to see available datasets",
],
docs_link="https://docs.cognee.ai/api/cognify",
context={
"provided_datasets": payload.datasets,
"provided_dataset_ids": payload.dataset_ids,
},
operation="cognify",
)
# Check for LLM API key early to provide better error messaging
llm_api_key = os.getenv("LLM_API_KEY")
if not llm_api_key:
raise MissingAPIKeyError(service="LLM", env_var="LLM_API_KEY")
from cognee.api.v1.cognify import cognify as cognee_cognify
try:
datasets = payload.dataset_ids if payload.dataset_ids else payload.datasets
datasets = payload.dataset_ids if payload.dataset_ids else payload.datasets
cognify_run = await cognee_cognify(
datasets, user, run_in_background=payload.run_in_background
)
logger.info(
f"Starting cognify process for user {user.id}",
extra={
"user_id": user.id,
"datasets": datasets,
"run_in_background": payload.run_in_background,
},
)
return cognify_run
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})
# The enhanced exception handler will catch and format any errors from cognee_cognify
cognify_run = await cognee_cognify(
datasets, user, run_in_background=payload.run_in_background
)
logger.info(
f"Cognify process completed for user {user.id}",
extra={"user_id": user.id, "datasets": datasets},
)
return cognify_run
@router.websocket("/subscribe/{pipeline_run_id}")
async def subscribe_to_cognify_info(websocket: WebSocket, pipeline_run_id: str):
@ -135,31 +174,43 @@ def get_cognify_router() -> APIRouter:
initialize_queue(pipeline_run_id)
while True:
pipeline_run_info = get_from_queue(pipeline_run_id)
try:
# If the pipeline is already completed, send the completion status
if isinstance(pipeline_run, PipelineRunCompleted):
graph_data = await get_formatted_graph_data()
pipeline_run.payload = {
"nodes": graph_data.get("nodes", []),
"edges": graph_data.get("edges", []),
}
if not pipeline_run_info:
await asyncio.sleep(2)
continue
await websocket.send_json(pipeline_run.model_dump())
await websocket.close(code=WS_1000_NORMAL_CLOSURE)
return
if not isinstance(pipeline_run_info, PipelineRunInfo):
continue
# Stream pipeline updates
while True:
try:
pipeline_run_info = await asyncio.wait_for(
get_from_queue(pipeline_run_id), timeout=10.0
)
try:
await websocket.send_json(
{
"pipeline_run_id": str(pipeline_run_info.pipeline_run_id),
"status": pipeline_run_info.status,
"payload": await get_formatted_graph_data(pipeline_run.dataset_id, user.id),
}
)
if pipeline_run_info:
await websocket.send_json(pipeline_run_info.model_dump())
if isinstance(pipeline_run_info, PipelineRunCompleted):
remove_queue(pipeline_run_id)
await websocket.close(code=WS_1000_NORMAL_CLOSURE)
if isinstance(pipeline_run_info, PipelineRunCompleted):
break
except asyncio.TimeoutError:
# Send a heartbeat to keep the connection alive
await websocket.send_json({"type": "heartbeat"})
except Exception as e:
logger.error(f"Error in WebSocket communication: {str(e)}")
break
except WebSocketDisconnect:
remove_queue(pipeline_run_id)
break
except WebSocketDisconnect:
logger.info(f"WebSocket disconnected for pipeline {pipeline_run_id}")
except Exception as error:
logger.error(f"WebSocket error: {str(error)}")
finally:
remove_queue(pipeline_run_id)
return router

View file

@ -1,22 +1,21 @@
from uuid import UUID
from typing import Optional
from datetime import datetime
from fastapi import Depends, APIRouter
from typing import List, Optional
from fastapi import APIRouter, Depends
from fastapi.responses import JSONResponse
from cognee.api.DTO import InDTO
from cognee.modules.search.types import SearchType
from cognee.api.DTO import InDTO, OutDTO
from cognee.modules.users.exceptions.exceptions import PermissionDeniedError
from cognee.modules.users.models import User
from cognee.modules.search.operations import get_history
from cognee.modules.users.methods import get_authenticated_user
from cognee.modules.users.models import User
from cognee.modules.users.exceptions import PermissionDeniedError
from cognee.modules.data.methods import get_history
from cognee.exceptions import UnsupportedSearchTypeError, InvalidQueryError, NoDataToProcessError
# Note: Datasets sent by name will only map to datasets owned by the request sender
# To search for datasets not owned by the request sender dataset UUID is needed
class SearchPayloadDTO(InDTO):
search_type: SearchType
datasets: Optional[list[str]] = None
dataset_ids: Optional[list[UUID]] = None
datasets: Optional[List[str]] = None
dataset_ids: Optional[List[UUID]] = None
query: str
top_k: Optional[int] = 10
@ -24,36 +23,23 @@ class SearchPayloadDTO(InDTO):
def get_search_router() -> APIRouter:
router = APIRouter()
class SearchHistoryItem(OutDTO):
id: UUID
text: str
user: str
created_at: datetime
@router.get("", response_model=list[SearchHistoryItem])
@router.get("/history", response_model=list)
async def get_search_history(user: User = Depends(get_authenticated_user)):
"""
Get search history for the authenticated user.
This endpoint retrieves the search history for the authenticated user,
returning a list of previously executed searches with their timestamps.
This endpoint retrieves the search history for the current user,
showing previous queries and their results.
## Response
Returns a list of search history items containing:
- **id**: Unique identifier for the search
- **text**: The search query text
- **user**: User who performed the search
- **created_at**: When the search was performed
Returns a list of historical search queries and their metadata.
## Error Codes
- **500 Internal Server Error**: Error retrieving search history
- **500 Internal Server Error**: Database or system error while retrieving history
"""
try:
history = await get_history(user.id, limit=0)
return history
except Exception as error:
return JSONResponse(status_code=500, content={"error": str(error)})
# Remove try-catch to let enhanced exception handler deal with it
history = await get_history(user.id, limit=0)
return history
@router.post("", response_model=list)
async def search(payload: SearchPayloadDTO, user: User = Depends(get_authenticated_user)):
@ -75,30 +61,51 @@ def get_search_router() -> APIRouter:
Returns a list of search results containing relevant nodes from the graph.
## Error Codes
- **409 Conflict**: Error during search operation
- **403 Forbidden**: User doesn't have permission to search datasets (returns empty list)
- **400 Bad Request**: Invalid query or search parameters
- **404 Not Found**: No data found to search
- **422 Unprocessable Entity**: Unsupported search type
- **403 Forbidden**: User doesn't have permission to search datasets
- **500 Internal Server Error**: System error during search
## Notes
- Datasets sent by name will only map to datasets owned by the request sender
- To search datasets not owned by the request sender, dataset UUID is needed
- If permission is denied, returns empty list instead of error
- Enhanced error messages provide actionable suggestions for fixing issues
"""
from cognee.api.v1.search import search as cognee_search
try:
results = await cognee_search(
query_text=payload.query,
query_type=payload.search_type,
user=user,
datasets=payload.datasets,
dataset_ids=payload.dataset_ids,
top_k=payload.top_k,
# Input validation with enhanced exceptions
if not payload.query or not payload.query.strip():
raise InvalidQueryError(query=payload.query or "", reason="Query cannot be empty")
if len(payload.query.strip()) < 2:
raise InvalidQueryError(
query=payload.query, reason="Query must be at least 2 characters long"
)
return results
except PermissionDeniedError:
# Check if search type is supported
try:
search_type = payload.search_type
except ValueError:
raise UnsupportedSearchTypeError(
search_type=str(payload.search_type), supported_types=[t.value for t in SearchType]
)
# Permission denied errors will be caught and handled by the enhanced exception handler
# Other exceptions will also be properly formatted by the global handler
results = await cognee_search(
query_text=payload.query,
query_type=payload.search_type,
user=user,
datasets=payload.datasets,
dataset_ids=payload.dataset_ids,
top_k=payload.top_k,
)
# If no results found, that's not necessarily an error, just return empty list
if not results:
return []
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})
return results
return router

View file

@ -1,10 +1,11 @@
"""
Custom exceptions for the Cognee API.
This module defines a set of exceptions for handling various application errors,
such as service failures, resource conflicts, and invalid operations.
This module defines a comprehensive set of exceptions for handling various application errors,
with enhanced error context, user-friendly messages, and actionable suggestions.
"""
# Import original exceptions for backward compatibility
from .exceptions import (
CogneeApiError,
ServiceError,
@ -12,3 +13,83 @@ from .exceptions import (
InvalidAttributeError,
CriticalError,
)
# Import enhanced exception hierarchy
from .enhanced_exceptions import (
CogneeBaseError,
CogneeUserError,
CogneeSystemError,
CogneeTransientError,
CogneeConfigurationError,
CogneeValidationError,
CogneeAuthenticationError,
CogneePermissionError,
CogneeNotFoundError,
CogneeRateLimitError,
)
# Import domain-specific exceptions
from .domain_exceptions import (
# Data/Input Errors
UnsupportedFileFormatError,
EmptyDatasetError,
DatasetNotFoundError,
InvalidQueryError,
FileAccessError,
# Processing Errors
LLMConnectionError,
LLMRateLimitError,
ProcessingTimeoutError,
DatabaseConnectionError,
InsufficientResourcesError,
# Configuration Errors
MissingAPIKeyError,
InvalidDatabaseConfigError,
UnsupportedSearchTypeError,
# Pipeline Errors
PipelineExecutionError,
DataExtractionError,
NoDataToProcessError,
)
# For backward compatibility, create aliases
# These will allow existing code to continue working while we migrate
DatasetNotFoundError_Legacy = InvalidValueError # For existing dataset not found errors
PermissionDeniedError_Legacy = CogneeApiError # For existing permission errors
__all__ = [
# Original exceptions (backward compatibility)
"CogneeApiError",
"ServiceError",
"InvalidValueError",
"InvalidAttributeError",
"CriticalError",
# Enhanced base exceptions
"CogneeBaseError",
"CogneeUserError",
"CogneeSystemError",
"CogneeTransientError",
"CogneeConfigurationError",
"CogneeValidationError",
"CogneeAuthenticationError",
"CogneePermissionError",
"CogneeNotFoundError",
"CogneeRateLimitError",
# Domain-specific exceptions
"UnsupportedFileFormatError",
"EmptyDatasetError",
"DatasetNotFoundError",
"InvalidQueryError",
"FileAccessError",
"LLMConnectionError",
"LLMRateLimitError",
"ProcessingTimeoutError",
"DatabaseConnectionError",
"InsufficientResourcesError",
"MissingAPIKeyError",
"InvalidDatabaseConfigError",
"UnsupportedSearchTypeError",
"PipelineExecutionError",
"DataExtractionError",
"NoDataToProcessError",
]

View file

@ -0,0 +1,337 @@
from typing import List, Optional, Dict, Any
from .enhanced_exceptions import (
CogneeUserError,
CogneeSystemError,
CogneeTransientError,
CogneeConfigurationError,
CogneeValidationError,
CogneeNotFoundError,
CogneePermissionError,
)
# ========== DATA/INPUT ERRORS (User-fixable) ==========
class UnsupportedFileFormatError(CogneeValidationError):
"""File format not supported by Cognee"""
def __init__(self, file_path: str, supported_formats: List[str], **kwargs):
super().__init__(
message=f"File format not supported: {file_path}",
user_message=f"The file '{file_path}' has an unsupported format.",
suggestions=[
f"Use one of these supported formats: {', '.join(supported_formats)}",
"Convert your file to a supported format",
"Check our documentation for the complete list of supported formats",
],
docs_link="https://docs.cognee.ai/guides/file-formats",
context={"file_path": file_path, "supported_formats": supported_formats},
operation="add",
**kwargs,
)
class EmptyDatasetError(CogneeValidationError):
"""Dataset is empty or contains no processable content"""
def __init__(self, dataset_name: str, **kwargs):
super().__init__(
message=f"Dataset '{dataset_name}' is empty",
user_message=f"The dataset '{dataset_name}' contains no data to process.",
suggestions=[
"Add some data to the dataset first using cognee.add()",
"Check if your files contain readable text content",
"Verify that your data was uploaded successfully",
],
docs_link="https://docs.cognee.ai/guides/adding-data",
context={"dataset_name": dataset_name},
operation="cognify",
**kwargs,
)
class DatasetNotFoundError(CogneeNotFoundError):
"""Dataset not found or not accessible"""
def __init__(
self, dataset_identifier: str, available_datasets: Optional[List[str]] = None, **kwargs
):
suggestions = ["Check the dataset name for typos"]
if available_datasets:
suggestions.extend(
[
f"Available datasets: {', '.join(available_datasets)}",
"Use cognee.datasets() to see all your datasets",
]
)
else:
suggestions.append("Create the dataset first by adding data to it")
super().__init__(
message=f"Dataset not found: {dataset_identifier}",
user_message=f"Could not find dataset '{dataset_identifier}'.",
suggestions=suggestions,
docs_link="https://docs.cognee.ai/guides/datasets",
context={
"dataset_identifier": dataset_identifier,
"available_datasets": available_datasets,
},
**kwargs,
)
class InvalidQueryError(CogneeValidationError):
"""Search query is invalid or malformed"""
def __init__(self, query: str, reason: str, **kwargs):
super().__init__(
message=f"Invalid query: {reason}",
user_message=f"Your search query '{query}' is invalid: {reason}",
suggestions=[
"Try rephrasing your query",
"Use simpler, more specific terms",
"Check our query examples in the documentation",
],
docs_link="https://docs.cognee.ai/guides/search",
context={"query": query, "reason": reason},
operation="search",
**kwargs,
)
class FileAccessError(CogneeUserError):
"""Cannot access or read the specified file"""
def __init__(self, file_path: str, reason: str, **kwargs):
super().__init__(
message=f"Cannot access file: {file_path} - {reason}",
user_message=f"Unable to read the file '{file_path}': {reason}",
suggestions=[
"Check if the file exists at the specified path",
"Verify you have read permissions for the file",
"Ensure the file is not locked by another application",
],
context={"file_path": file_path, "reason": reason},
operation="add",
**kwargs,
)
# ========== PROCESSING ERRORS (System/LLM errors) ==========
class LLMConnectionError(CogneeTransientError):
"""LLM service connection failure"""
def __init__(self, provider: str, model: str, reason: str, **kwargs):
super().__init__(
message=f"LLM connection failed: {provider}/{model} - {reason}",
user_message=f"Cannot connect to the {provider} language model service.",
suggestions=[
"Check your internet connection",
"Verify your API key is correct and has sufficient credits",
"Try again in a few moments",
"Check the service status page",
],
docs_link="https://docs.cognee.ai/troubleshooting/llm-connection",
context={"provider": provider, "model": model, "reason": reason},
**kwargs,
)
class LLMRateLimitError(CogneeTransientError):
"""LLM service rate limit exceeded"""
def __init__(self, provider: str, retry_after: Optional[int] = None, **kwargs):
suggestions = [
"Wait a moment before retrying",
"Consider upgrading your API plan",
"Use smaller batch sizes to reduce token usage",
]
if retry_after:
suggestions.insert(0, f"Wait {retry_after} seconds before retrying")
super().__init__(
message=f"Rate limit exceeded for {provider}",
user_message=f"You've exceeded the rate limit for {provider}.",
suggestions=suggestions,
context={"provider": provider, "retry_after": retry_after},
**kwargs,
)
class ProcessingTimeoutError(CogneeTransientError):
"""Processing operation timed out"""
def __init__(self, operation: str, timeout_seconds: int, **kwargs):
super().__init__(
message=f"Operation '{operation}' timed out after {timeout_seconds}s",
user_message=f"The {operation} operation took too long and was cancelled.",
suggestions=[
"Try processing smaller amounts of data at a time",
"Check your internet connection stability",
"Retry the operation",
"Use background processing for large datasets",
],
context={"operation": operation, "timeout_seconds": timeout_seconds},
**kwargs,
)
class DatabaseConnectionError(CogneeSystemError):
"""Database connection failure"""
def __init__(self, db_type: str, reason: str, **kwargs):
super().__init__(
message=f"{db_type} database connection failed: {reason}",
user_message=f"Cannot connect to the {db_type} database.",
suggestions=[
"Check if the database service is running",
"Verify database connection configuration",
"Check network connectivity",
"Contact support if the issue persists",
],
docs_link="https://docs.cognee.ai/troubleshooting/database",
context={"db_type": db_type, "reason": reason},
**kwargs,
)
class InsufficientResourcesError(CogneeSystemError):
"""System has insufficient resources to complete the operation"""
def __init__(self, resource_type: str, required: str, available: str, **kwargs):
super().__init__(
message=f"Insufficient {resource_type}: need {required}, have {available}",
user_message=f"Not enough {resource_type} available to complete this operation.",
suggestions=[
"Try processing smaller amounts of data",
"Free up system resources",
"Wait for other operations to complete",
"Consider upgrading your system resources",
],
context={"resource_type": resource_type, "required": required, "available": available},
**kwargs,
)
# ========== CONFIGURATION ERRORS ==========
class MissingAPIKeyError(CogneeConfigurationError):
"""Required API key is missing"""
def __init__(self, service: str, env_var: str, **kwargs):
super().__init__(
message=f"Missing API key for {service}",
user_message=f"API key for {service} is not configured.",
suggestions=[
f"Set the {env_var} environment variable",
f"Add your {service} API key to your .env file",
"Check the setup documentation for detailed instructions",
],
docs_link="https://docs.cognee.ai/setup/api-keys",
context={"service": service, "env_var": env_var},
**kwargs,
)
class InvalidDatabaseConfigError(CogneeConfigurationError):
"""Database configuration is invalid"""
def __init__(self, db_type: str, config_issue: str, **kwargs):
super().__init__(
message=f"Invalid {db_type} database configuration: {config_issue}",
user_message=f"The {db_type} database is not properly configured: {config_issue}",
suggestions=[
"Check your database configuration settings",
"Verify connection strings and credentials",
"Review the database setup documentation",
"Ensure the database server is accessible",
],
docs_link="https://docs.cognee.ai/setup/databases",
context={"db_type": db_type, "config_issue": config_issue},
**kwargs,
)
class UnsupportedSearchTypeError(CogneeValidationError):
"""Search type is not supported"""
def __init__(self, search_type: str, supported_types: List[str], **kwargs):
super().__init__(
message=f"Unsupported search type: {search_type}",
user_message=f"The search type '{search_type}' is not supported.",
suggestions=[
f"Use one of these supported search types: {', '.join(supported_types)}",
"Check the search documentation for available types",
"Try using GRAPH_COMPLETION for general queries",
],
docs_link="https://docs.cognee.ai/guides/search-types",
context={"search_type": search_type, "supported_types": supported_types},
operation="search",
**kwargs,
)
# ========== PIPELINE ERRORS ==========
class PipelineExecutionError(CogneeSystemError):
"""Pipeline execution failed"""
def __init__(self, pipeline_name: str, task_name: str, error_details: str, **kwargs):
super().__init__(
message=f"Pipeline '{pipeline_name}' failed at task '{task_name}': {error_details}",
user_message=f"Processing failed during the {task_name} step.",
suggestions=[
"Check the logs for more detailed error information",
"Verify your data is in a supported format",
"Try processing smaller amounts of data",
"Contact support if the issue persists",
],
context={
"pipeline_name": pipeline_name,
"task_name": task_name,
"error_details": error_details,
},
**kwargs,
)
class DataExtractionError(CogneeSystemError):
"""Failed to extract content from data"""
def __init__(self, source: str, reason: str, **kwargs):
super().__init__(
message=f"Data extraction failed for {source}: {reason}",
user_message=f"Could not extract readable content from '{source}'.",
suggestions=[
"Verify the file is not corrupted",
"Try converting to a different format",
"Check if the file contains readable text",
"Use a supported file format",
],
context={"source": source, "reason": reason},
operation="add",
**kwargs,
)
class NoDataToProcessError(CogneeValidationError):
"""No data available to process"""
def __init__(self, operation: str, **kwargs):
super().__init__(
message=f"No data available for {operation}",
user_message=f"There's no data to process for the {operation} operation.",
suggestions=[
"Add some data first using cognee.add()",
"Check if your previous data upload was successful",
"Verify the dataset contains processable content",
],
docs_link="https://docs.cognee.ai/guides/adding-data",
context={"operation": operation},
**kwargs,
)

View file

@ -0,0 +1,184 @@
from typing import Dict, List, Optional, Any
from fastapi import status
from cognee.shared.logging_utils import get_logger
logger = get_logger()
class CogneeBaseError(Exception):
"""
Base exception for all Cognee errors with enhanced context and user experience.
This class provides a foundation for all Cognee exceptions with:
- Rich error context
- User-friendly messages
- Actionable suggestions
- Documentation links
- Retry information
"""
def __init__(
self,
message: str,
user_message: Optional[str] = None,
suggestions: Optional[List[str]] = None,
docs_link: Optional[str] = None,
context: Optional[Dict[str, Any]] = None,
is_retryable: bool = False,
status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR,
log_level: str = "ERROR",
operation: Optional[str] = None,
):
self.message = message
self.user_message = user_message or message
self.suggestions = suggestions or []
self.docs_link = docs_link
self.context = context or {}
self.is_retryable = is_retryable
self.status_code = status_code
self.operation = operation
# Automatically log the exception
if log_level == "ERROR":
logger.error(f"CogneeError in {operation or 'unknown'}: {message}", extra=self.context)
elif log_level == "WARNING":
logger.warning(
f"CogneeWarning in {operation or 'unknown'}: {message}", extra=self.context
)
elif log_level == "INFO":
logger.info(f"CogneeInfo in {operation or 'unknown'}: {message}", extra=self.context)
super().__init__(self.message)
def __str__(self):
return f"{self.__class__.__name__}: {self.message}"
def to_dict(self) -> Dict[str, Any]:
"""Convert exception to dictionary for API responses"""
return {
"type": self.__class__.__name__,
"message": self.user_message,
"technical_message": self.message,
"suggestions": self.suggestions,
"docs_link": self.docs_link,
"is_retryable": self.is_retryable,
"context": self.context,
"operation": self.operation,
}
class CogneeUserError(CogneeBaseError):
"""
User-fixable errors (4xx status codes).
These are errors caused by user input or actions that can be corrected
by the user. Examples: invalid file format, missing required field.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_400_BAD_REQUEST)
kwargs.setdefault("log_level", "WARNING")
super().__init__(*args, **kwargs)
class CogneeSystemError(CogneeBaseError):
"""
System/infrastructure errors (5xx status codes).
These are errors caused by system issues that require technical intervention.
Examples: database connection failure, service unavailable.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_500_INTERNAL_SERVER_ERROR)
kwargs.setdefault("log_level", "ERROR")
super().__init__(*args, **kwargs)
class CogneeTransientError(CogneeBaseError):
"""
Temporary/retryable errors.
These are errors that might succeed if retried, often due to temporary
resource constraints or network issues.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_503_SERVICE_UNAVAILABLE)
kwargs.setdefault("is_retryable", True)
kwargs.setdefault("log_level", "WARNING")
super().__init__(*args, **kwargs)
class CogneeConfigurationError(CogneeBaseError):
"""
Setup/configuration errors.
These are errors related to missing or invalid configuration that
prevent the system from operating correctly.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_422_UNPROCESSABLE_ENTITY)
kwargs.setdefault("log_level", "ERROR")
super().__init__(*args, **kwargs)
class CogneeValidationError(CogneeUserError):
"""
Input validation errors.
Specific type of user error for invalid input data.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_422_UNPROCESSABLE_ENTITY)
super().__init__(*args, **kwargs)
class CogneeAuthenticationError(CogneeUserError):
"""
Authentication and authorization errors.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_401_UNAUTHORIZED)
super().__init__(*args, **kwargs)
class CogneePermissionError(CogneeUserError):
"""
Permission denied errors.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_403_FORBIDDEN)
super().__init__(*args, **kwargs)
class CogneeNotFoundError(CogneeUserError):
"""
Resource not found errors.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_404_NOT_FOUND)
super().__init__(*args, **kwargs)
class CogneeRateLimitError(CogneeTransientError):
"""
Rate limiting errors.
"""
def __init__(self, *args, **kwargs):
kwargs.setdefault("status_code", status.HTTP_429_TOO_MANY_REQUESTS)
kwargs.setdefault(
"suggestions",
[
"Wait a moment before retrying",
"Check your API rate limits",
"Consider using smaller batch sizes",
],
)
super().__init__(*args, **kwargs)

View file

@ -2,6 +2,14 @@ import inspect
from cognee.shared.logging_utils import get_logger
from cognee.modules.users.models import User
from cognee.shared.utils import send_telemetry
from cognee.exceptions import (
PipelineExecutionError,
CogneeTransientError,
CogneeSystemError,
CogneeUserError,
LLMConnectionError,
DatabaseConnectionError,
)
from ..tasks.task import Task
@ -16,15 +24,33 @@ async def handle_task(
user: User,
context: dict = None,
):
"""Handle common task workflow with logging, telemetry, and error handling around the core execution logic."""
task_type = running_task.task_type
"""
Handle common task workflow with enhanced error handling and recovery strategies.
This function provides comprehensive error handling for pipeline tasks with:
- Context-aware error reporting
- Automatic retry for transient errors
- Detailed error logging and telemetry
- User-friendly error messages
"""
task_type = running_task.task_type
task_name = running_task.executable.__name__
logger.info(
f"{task_type} task started: `{task_name}`",
extra={
"task_type": task_type,
"task_name": task_name,
"user_id": user.id,
"context": context,
},
)
logger.info(f"{task_type} task started: `{running_task.executable.__name__}`")
send_telemetry(
f"{task_type} Task Started",
user_id=user.id,
additional_properties={
"task_name": running_task.executable.__name__,
"task_name": task_name,
},
)
@ -35,36 +61,151 @@ async def handle_task(
if has_context:
args.append(context)
try:
async for result_data in running_task.execute(args, next_task_batch_size):
async for result in run_tasks_base(leftover_tasks, result_data, user, context):
yield result
# Retry configuration for transient errors
max_retries = 3
retry_count = 0
logger.info(f"{task_type} task completed: `{running_task.executable.__name__}`")
send_telemetry(
f"{task_type} Task Completed",
user_id=user.id,
additional_properties={
"task_name": running_task.executable.__name__,
},
)
except Exception as error:
logger.error(
f"{task_type} task errored: `{running_task.executable.__name__}`\n{str(error)}\n",
exc_info=True,
)
send_telemetry(
f"{task_type} Task Errored",
user_id=user.id,
additional_properties={
"task_name": running_task.executable.__name__,
},
)
raise error
while retry_count <= max_retries:
try:
async for result_data in running_task.execute(args, next_task_batch_size):
async for result in run_tasks_base(leftover_tasks, result_data, user, context):
yield result
logger.info(
f"{task_type} task completed: `{task_name}`",
extra={
"task_type": task_type,
"task_name": task_name,
"user_id": user.id,
"retry_count": retry_count,
},
)
send_telemetry(
f"{task_type} Task Completed",
user_id=user.id,
additional_properties={
"task_name": task_name,
"retry_count": retry_count,
},
)
return # Success, exit retry loop
except CogneeTransientError as error:
retry_count += 1
if retry_count <= max_retries:
logger.warning(
f"Transient error in {task_type} task `{task_name}`, retrying ({retry_count}/{max_retries}): {error}",
extra={
"task_type": task_type,
"task_name": task_name,
"user_id": user.id,
"retry_count": retry_count,
"error_type": error.__class__.__name__,
},
)
# Exponential backoff for retries
import asyncio
await asyncio.sleep(2**retry_count)
continue
else:
# Max retries exceeded, raise enhanced error
raise PipelineExecutionError(
pipeline_name=f"{task_type}_pipeline",
task_name=task_name,
error_details=f"Max retries ({max_retries}) exceeded for transient error: {error}",
)
except (CogneeUserError, CogneeSystemError) as error:
# These errors shouldn't be retried, re-raise as pipeline execution error
logger.error(
f"{task_type} task failed: `{task_name}` - {error.__class__.__name__}: {error}",
extra={
"task_type": task_type,
"task_name": task_name,
"user_id": user.id,
"error_type": error.__class__.__name__,
"error_context": getattr(error, "context", {}),
},
exc_info=True,
)
send_telemetry(
f"{task_type} Task Errored",
user_id=user.id,
additional_properties={
"task_name": task_name,
"error_type": error.__class__.__name__,
},
)
# Wrap in pipeline execution error with additional context
raise PipelineExecutionError(
pipeline_name=f"{task_type}_pipeline",
task_name=task_name,
error_details=f"{error.__class__.__name__}: {error}",
context={
"original_error": error.__class__.__name__,
"original_context": getattr(error, "context", {}),
"user_id": user.id,
"task_args": str(args)[:200], # Truncate for logging
},
)
except Exception as error:
# Unexpected error, wrap in enhanced exception
logger.error(
f"{task_type} task encountered unexpected error: `{task_name}` - {error}",
extra={
"task_type": task_type,
"task_name": task_name,
"user_id": user.id,
"error_type": error.__class__.__name__,
},
exc_info=True,
)
send_telemetry(
f"{task_type} Task Errored",
user_id=user.id,
additional_properties={
"task_name": task_name,
"error_type": error.__class__.__name__,
},
)
# Check if this might be a known error type we can categorize
error_message = str(error).lower()
if any(term in error_message for term in ["connection", "timeout", "network"]):
if (
"llm" in error_message
or "openai" in error_message
or "anthropic" in error_message
):
raise LLMConnectionError(provider="Unknown", model="Unknown", reason=str(error))
elif "database" in error_message or "sql" in error_message:
raise DatabaseConnectionError(db_type="Unknown", reason=str(error))
# Default to pipeline execution error
raise PipelineExecutionError(
pipeline_name=f"{task_type}_pipeline",
task_name=task_name,
error_details=f"Unexpected error: {error}",
context={
"error_type": error.__class__.__name__,
"user_id": user.id,
"task_args": str(args)[:200], # Truncate for logging
},
)
async def run_tasks_base(tasks: list[Task], data=None, user: User = None, context: dict = None):
"""Base function to execute tasks in a pipeline, handling task type detection and execution."""
"""
Base function to execute tasks in a pipeline with enhanced error handling.
Provides comprehensive error handling, logging, and recovery strategies for pipeline execution.
"""
if len(tasks) == 0:
yield data
return