diff --git a/cognee/api/client.py b/cognee/api/client.py index c504690e4..e630e0a91 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -15,6 +15,7 @@ from fastapi.exceptions import RequestValidationError from fastapi.openapi.utils import get_openapi from cognee.exceptions import CogneeApiError +from cognee.exceptions.enhanced_exceptions import CogneeBaseError from cognee.shared.logging_utils import get_logger, setup_logging from cognee.api.v1.permissions.routers import get_permissions_router from cognee.api.v1.settings.routers import get_settings_router @@ -120,8 +121,27 @@ async def request_validation_exception_handler(request: Request, exc: RequestVal ) +@app.exception_handler(CogneeBaseError) +async def enhanced_exception_handler(_: Request, exc: CogneeBaseError) -> JSONResponse: + """ + Enhanced exception handler for the new exception hierarchy. + Provides standardized error responses with rich context and user guidance. + """ + # Log the full stack trace for debugging + logger.error(f"Enhanced exception caught: {exc.__class__.__name__}", exc_info=True) + + # Create standardized error response + error_response = {"error": exc.to_dict()} + + return JSONResponse(status_code=exc.status_code, content=error_response) + + @app.exception_handler(CogneeApiError) -async def exception_handler(_: Request, exc: CogneeApiError) -> JSONResponse: +async def legacy_exception_handler(_: Request, exc: CogneeApiError) -> JSONResponse: + """ + Legacy exception handler for backward compatibility. + Handles old CogneeApiError instances with fallback formatting. + """ detail = {} if exc.name and exc.message and exc.status_code: @@ -136,7 +156,54 @@ async def exception_handler(_: Request, exc: CogneeApiError) -> JSONResponse: # log the stack trace for easier serverside debugging logger.error(format_exc()) - return JSONResponse(status_code=status_code, content={"detail": detail["message"]}) + + # Convert to new format for consistency + error_response = { + "error": { + "type": exc.__class__.__name__, + "message": detail["message"], + "technical_message": detail["message"], + "suggestions": [ + "Check the logs for more details", + "Try again or contact support if the issue persists", + ], + "docs_link": "https://docs.cognee.ai/troubleshooting", + "is_retryable": False, + "context": {}, + "operation": None, + } + } + + return JSONResponse(status_code=status_code, content=error_response) + + +@app.exception_handler(Exception) +async def global_exception_handler(request: Request, exc: Exception) -> JSONResponse: + """ + Global exception handler for any unhandled exceptions. + Ensures all errors return a consistent format. + """ + logger.error(f"Unhandled exception in {request.url.path}: {str(exc)}", exc_info=True) + + # Create a standardized error response for unexpected errors + error_response = { + "error": { + "type": "UnexpectedError", + "message": "An unexpected error occurred. Please try again.", + "technical_message": str(exc) if app_environment != "prod" else "Internal server error", + "suggestions": [ + "Try your request again", + "Check if the issue persists", + "Contact support if the problem continues", + ], + "docs_link": "https://docs.cognee.ai/troubleshooting", + "is_retryable": True, + "context": {"path": str(request.url.path), "method": request.method}, + "operation": None, + } + } + + return JSONResponse(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, content=error_response) @app.get("/") diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index e18a5f322..0393250e1 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -11,6 +11,13 @@ import requests from cognee.modules.users.models import User from cognee.modules.users.methods import get_authenticated_user +from cognee.exceptions import ( + UnsupportedFileFormatError, + FileAccessError, + DatasetNotFoundError, + CogneeValidationError, + CogneeSystemError, +) logger = get_logger() @@ -49,49 +56,143 @@ def get_add_router() -> APIRouter: - Any relevant metadata from the ingestion process ## Error Codes - - **400 Bad Request**: Neither datasetId nor datasetName provided - - **409 Conflict**: Error during add operation + - **400 Bad Request**: Missing required parameters or invalid input + - **422 Unprocessable Entity**: Unsupported file format or validation error - **403 Forbidden**: User doesn't have permission to add to dataset + - **500 Internal Server Error**: System error during processing ## Notes - To add data to datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True) - GitHub repositories are cloned and all files are processed - HTTP URLs are fetched and their content is processed - The ALLOW_HTTP_REQUESTS environment variable controls URL processing + - Enhanced error messages provide specific guidance for fixing issues """ from cognee.api.v1.add import add as cognee_add + # Input validation with enhanced exceptions if not datasetId and not datasetName: - raise ValueError("Either datasetId or datasetName must be provided.") + raise CogneeValidationError( + message="Either datasetId or datasetName must be provided", + user_message="You must specify either a dataset name or dataset ID.", + suggestions=[ + "Provide a datasetName parameter (e.g., 'my_dataset')", + "Provide a datasetId parameter with a valid UUID", + "Check the API documentation for parameter examples", + ], + docs_link="https://docs.cognee.ai/api/add", + context={"provided_dataset_name": datasetName, "provided_dataset_id": datasetId}, + operation="add", + ) - try: - if ( - isinstance(data, str) - and data.startswith("http") - and (os.getenv("ALLOW_HTTP_REQUESTS", "true").lower() == "true") - ): - if "github" in data: + if not data or len(data) == 0: + raise CogneeValidationError( + message="No data provided for upload", + user_message="You must provide data to add to the dataset.", + suggestions=[ + "Upload one or more files", + "Provide a valid URL (if URL processing is enabled)", + "Check that your request includes the data parameter", + ], + docs_link="https://docs.cognee.ai/guides/adding-data", + operation="add", + ) + + logger.info( + f"Adding {len(data)} items to dataset", + extra={ + "dataset_name": datasetName, + "dataset_id": datasetId, + "user_id": user.id, + "item_count": len(data), + }, + ) + + # Handle URL-based data (GitHub repos, HTTP URLs) + if ( + len(data) == 1 + and hasattr(data[0], "filename") + and isinstance(data[0].filename, str) + and data[0].filename.startswith("http") + and (os.getenv("ALLOW_HTTP_REQUESTS", "true").lower() == "true") + ): + url = data[0].filename + + if "github" in url: + try: # Perform git clone if the URL is from GitHub - repo_name = data.split("/")[-1].replace(".git", "") - subprocess.run(["git", "clone", data, f".data/{repo_name}"], check=True) + repo_name = url.split("/")[-1].replace(".git", "") + subprocess.run(["git", "clone", url, f".data/{repo_name}"], check=True) # TODO: Update add call with dataset info - await cognee_add( + result = await cognee_add( "data://.data/", f"{repo_name}", ) - else: - # Fetch and store the data from other types of URL using curl - response = requests.get(data) + except subprocess.CalledProcessError as e: + raise CogneeSystemError( + message=f"Failed to clone GitHub repository: {e}", + user_message=f"Could not clone the GitHub repository '{url}'.", + suggestions=[ + "Check if the repository URL is correct", + "Verify the repository is public or you have access", + "Try cloning the repository manually to test access", + ], + context={"url": url, "repo_name": repo_name, "error": str(e)}, + operation="add", + ) + else: + try: + # Fetch and store the data from other types of URL + response = requests.get(url, timeout=30) response.raise_for_status() - file_data = await response.content() + file_data = response.content # TODO: Update add call with dataset info - return await cognee_add(file_data) - else: - add_run = await cognee_add(data, datasetName, user=user, dataset_id=datasetId) + result = await cognee_add(file_data) + except requests.RequestException as e: + raise CogneeSystemError( + message=f"Failed to fetch URL: {e}", + user_message=f"Could not fetch content from '{url}'.", + suggestions=[ + "Check if the URL is accessible", + "Verify your internet connection", + "Try accessing the URL in a browser", + "Check if the URL requires authentication", + ], + context={"url": url, "error": str(e)}, + operation="add", + ) + else: + # Handle regular file uploads + # Validate file types before processing + supported_extensions = [ + ".txt", + ".pdf", + ".docx", + ".md", + ".csv", + ".json", + ".py", + ".js", + ".ts", + ] - return add_run.model_dump() - except Exception as error: - return JSONResponse(status_code=409, content={"error": str(error)}) + for file in data: + if file.filename: + file_ext = os.path.splitext(file.filename)[1].lower() + if file_ext and file_ext not in supported_extensions: + raise UnsupportedFileFormatError( + file_path=file.filename, supported_formats=supported_extensions + ) + + # Process the files + result = await cognee_add(data, datasetName, user=user, dataset_id=datasetId) + + logger.info( + "Successfully added data to dataset", + extra={"dataset_name": datasetName, "dataset_id": datasetId, "user_id": user.id}, + ) + + return result.model_dump() if hasattr(result, "model_dump") else result return router diff --git a/cognee/api/v1/cognify/routers/get_cognify_router.py b/cognee/api/v1/cognify/routers/get_cognify_router.py index 59162382d..1882a3fb4 100644 --- a/cognee/api/v1/cognify/routers/get_cognify_router.py +++ b/cognee/api/v1/cognify/routers/get_cognify_router.py @@ -24,6 +24,13 @@ from cognee.modules.pipelines.queues.pipeline_run_info_queues import ( remove_queue, ) from cognee.shared.logging_utils import get_logger +from cognee.exceptions import ( + CogneeValidationError, + EmptyDatasetError, + DatasetNotFoundError, + MissingAPIKeyError, + NoDataToProcessError, +) logger = get_logger("api.cognify") @@ -66,8 +73,10 @@ def get_cognify_router() -> APIRouter: - **Background execution**: Pipeline run metadata including pipeline_run_id for status monitoring via WebSocket subscription ## Error Codes - - **400 Bad Request**: When neither datasets nor dataset_ids are provided, or when specified datasets don't exist - - **409 Conflict**: When processing fails due to system errors, missing LLM API keys, database connection failures, or corrupted content + - **400 Bad Request**: Missing required parameters or invalid input + - **422 Unprocessable Entity**: No data to process or validation errors + - **404 Not Found**: Specified datasets don't exist + - **500 Internal Server Error**: System errors, missing API keys, database connection failures ## Example Request ```json @@ -84,23 +93,53 @@ def get_cognify_router() -> APIRouter: ## Next Steps After successful processing, use the search endpoints to query the generated knowledge graph for insights, relationships, and semantic search. """ + # Input validation with enhanced exceptions if not payload.datasets and not payload.dataset_ids: - return JSONResponse( - status_code=400, content={"error": "No datasets or dataset_ids provided"} + raise CogneeValidationError( + message="No datasets or dataset_ids provided", + user_message="You must specify which datasets to process.", + suggestions=[ + "Provide dataset names using the 'datasets' parameter", + "Provide dataset UUIDs using the 'dataset_ids' parameter", + "Use cognee.datasets() to see available datasets", + ], + docs_link="https://docs.cognee.ai/api/cognify", + context={ + "provided_datasets": payload.datasets, + "provided_dataset_ids": payload.dataset_ids, + }, + operation="cognify", ) + # Check for LLM API key early to provide better error messaging + llm_api_key = os.getenv("LLM_API_KEY") + if not llm_api_key: + raise MissingAPIKeyError(service="LLM", env_var="LLM_API_KEY") + from cognee.api.v1.cognify import cognify as cognee_cognify - try: - datasets = payload.dataset_ids if payload.dataset_ids else payload.datasets + datasets = payload.dataset_ids if payload.dataset_ids else payload.datasets - cognify_run = await cognee_cognify( - datasets, user, run_in_background=payload.run_in_background - ) + logger.info( + f"Starting cognify process for user {user.id}", + extra={ + "user_id": user.id, + "datasets": datasets, + "run_in_background": payload.run_in_background, + }, + ) - return cognify_run - except Exception as error: - return JSONResponse(status_code=409, content={"error": str(error)}) + # The enhanced exception handler will catch and format any errors from cognee_cognify + cognify_run = await cognee_cognify( + datasets, user, run_in_background=payload.run_in_background + ) + + logger.info( + f"Cognify process completed for user {user.id}", + extra={"user_id": user.id, "datasets": datasets}, + ) + + return cognify_run @router.websocket("/subscribe/{pipeline_run_id}") async def subscribe_to_cognify_info(websocket: WebSocket, pipeline_run_id: str): @@ -135,31 +174,43 @@ def get_cognify_router() -> APIRouter: initialize_queue(pipeline_run_id) - while True: - pipeline_run_info = get_from_queue(pipeline_run_id) + try: + # If the pipeline is already completed, send the completion status + if isinstance(pipeline_run, PipelineRunCompleted): + graph_data = await get_formatted_graph_data() + pipeline_run.payload = { + "nodes": graph_data.get("nodes", []), + "edges": graph_data.get("edges", []), + } - if not pipeline_run_info: - await asyncio.sleep(2) - continue + await websocket.send_json(pipeline_run.model_dump()) + await websocket.close(code=WS_1000_NORMAL_CLOSURE) + return - if not isinstance(pipeline_run_info, PipelineRunInfo): - continue + # Stream pipeline updates + while True: + try: + pipeline_run_info = await asyncio.wait_for( + get_from_queue(pipeline_run_id), timeout=10.0 + ) - try: - await websocket.send_json( - { - "pipeline_run_id": str(pipeline_run_info.pipeline_run_id), - "status": pipeline_run_info.status, - "payload": await get_formatted_graph_data(pipeline_run.dataset_id, user.id), - } - ) + if pipeline_run_info: + await websocket.send_json(pipeline_run_info.model_dump()) - if isinstance(pipeline_run_info, PipelineRunCompleted): - remove_queue(pipeline_run_id) - await websocket.close(code=WS_1000_NORMAL_CLOSURE) + if isinstance(pipeline_run_info, PipelineRunCompleted): + break + except asyncio.TimeoutError: + # Send a heartbeat to keep the connection alive + await websocket.send_json({"type": "heartbeat"}) + except Exception as e: + logger.error(f"Error in WebSocket communication: {str(e)}") break - except WebSocketDisconnect: - remove_queue(pipeline_run_id) - break + + except WebSocketDisconnect: + logger.info(f"WebSocket disconnected for pipeline {pipeline_run_id}") + except Exception as error: + logger.error(f"WebSocket error: {str(error)}") + finally: + remove_queue(pipeline_run_id) return router diff --git a/cognee/api/v1/search/routers/get_search_router.py b/cognee/api/v1/search/routers/get_search_router.py index e63016187..37123dd3d 100644 --- a/cognee/api/v1/search/routers/get_search_router.py +++ b/cognee/api/v1/search/routers/get_search_router.py @@ -1,22 +1,21 @@ from uuid import UUID -from typing import Optional -from datetime import datetime -from fastapi import Depends, APIRouter +from typing import List, Optional +from fastapi import APIRouter, Depends from fastapi.responses import JSONResponse + +from cognee.api.DTO import InDTO from cognee.modules.search.types import SearchType -from cognee.api.DTO import InDTO, OutDTO -from cognee.modules.users.exceptions.exceptions import PermissionDeniedError -from cognee.modules.users.models import User -from cognee.modules.search.operations import get_history from cognee.modules.users.methods import get_authenticated_user +from cognee.modules.users.models import User +from cognee.modules.users.exceptions import PermissionDeniedError +from cognee.modules.data.methods import get_history +from cognee.exceptions import UnsupportedSearchTypeError, InvalidQueryError, NoDataToProcessError -# Note: Datasets sent by name will only map to datasets owned by the request sender -# To search for datasets not owned by the request sender dataset UUID is needed class SearchPayloadDTO(InDTO): search_type: SearchType - datasets: Optional[list[str]] = None - dataset_ids: Optional[list[UUID]] = None + datasets: Optional[List[str]] = None + dataset_ids: Optional[List[UUID]] = None query: str top_k: Optional[int] = 10 @@ -24,36 +23,23 @@ class SearchPayloadDTO(InDTO): def get_search_router() -> APIRouter: router = APIRouter() - class SearchHistoryItem(OutDTO): - id: UUID - text: str - user: str - created_at: datetime - - @router.get("", response_model=list[SearchHistoryItem]) + @router.get("/history", response_model=list) async def get_search_history(user: User = Depends(get_authenticated_user)): """ Get search history for the authenticated user. - This endpoint retrieves the search history for the authenticated user, - returning a list of previously executed searches with their timestamps. + This endpoint retrieves the search history for the current user, + showing previous queries and their results. ## Response - Returns a list of search history items containing: - - **id**: Unique identifier for the search - - **text**: The search query text - - **user**: User who performed the search - - **created_at**: When the search was performed + Returns a list of historical search queries and their metadata. ## Error Codes - - **500 Internal Server Error**: Error retrieving search history + - **500 Internal Server Error**: Database or system error while retrieving history """ - try: - history = await get_history(user.id, limit=0) - - return history - except Exception as error: - return JSONResponse(status_code=500, content={"error": str(error)}) + # Remove try-catch to let enhanced exception handler deal with it + history = await get_history(user.id, limit=0) + return history @router.post("", response_model=list) async def search(payload: SearchPayloadDTO, user: User = Depends(get_authenticated_user)): @@ -75,30 +61,51 @@ def get_search_router() -> APIRouter: Returns a list of search results containing relevant nodes from the graph. ## Error Codes - - **409 Conflict**: Error during search operation - - **403 Forbidden**: User doesn't have permission to search datasets (returns empty list) + - **400 Bad Request**: Invalid query or search parameters + - **404 Not Found**: No data found to search + - **422 Unprocessable Entity**: Unsupported search type + - **403 Forbidden**: User doesn't have permission to search datasets + - **500 Internal Server Error**: System error during search ## Notes - Datasets sent by name will only map to datasets owned by the request sender - To search datasets not owned by the request sender, dataset UUID is needed - - If permission is denied, returns empty list instead of error + - Enhanced error messages provide actionable suggestions for fixing issues """ from cognee.api.v1.search import search as cognee_search - try: - results = await cognee_search( - query_text=payload.query, - query_type=payload.search_type, - user=user, - datasets=payload.datasets, - dataset_ids=payload.dataset_ids, - top_k=payload.top_k, + # Input validation with enhanced exceptions + if not payload.query or not payload.query.strip(): + raise InvalidQueryError(query=payload.query or "", reason="Query cannot be empty") + + if len(payload.query.strip()) < 2: + raise InvalidQueryError( + query=payload.query, reason="Query must be at least 2 characters long" ) - return results - except PermissionDeniedError: + # Check if search type is supported + try: + search_type = payload.search_type + except ValueError: + raise UnsupportedSearchTypeError( + search_type=str(payload.search_type), supported_types=[t.value for t in SearchType] + ) + + # Permission denied errors will be caught and handled by the enhanced exception handler + # Other exceptions will also be properly formatted by the global handler + results = await cognee_search( + query_text=payload.query, + query_type=payload.search_type, + user=user, + datasets=payload.datasets, + dataset_ids=payload.dataset_ids, + top_k=payload.top_k, + ) + + # If no results found, that's not necessarily an error, just return empty list + if not results: return [] - except Exception as error: - return JSONResponse(status_code=409, content={"error": str(error)}) + + return results return router diff --git a/cognee/exceptions/__init__.py b/cognee/exceptions/__init__.py index d1d4ecbf5..30f2445db 100644 --- a/cognee/exceptions/__init__.py +++ b/cognee/exceptions/__init__.py @@ -1,10 +1,11 @@ """ Custom exceptions for the Cognee API. -This module defines a set of exceptions for handling various application errors, -such as service failures, resource conflicts, and invalid operations. +This module defines a comprehensive set of exceptions for handling various application errors, +with enhanced error context, user-friendly messages, and actionable suggestions. """ +# Import original exceptions for backward compatibility from .exceptions import ( CogneeApiError, ServiceError, @@ -12,3 +13,83 @@ from .exceptions import ( InvalidAttributeError, CriticalError, ) + +# Import enhanced exception hierarchy +from .enhanced_exceptions import ( + CogneeBaseError, + CogneeUserError, + CogneeSystemError, + CogneeTransientError, + CogneeConfigurationError, + CogneeValidationError, + CogneeAuthenticationError, + CogneePermissionError, + CogneeNotFoundError, + CogneeRateLimitError, +) + +# Import domain-specific exceptions +from .domain_exceptions import ( + # Data/Input Errors + UnsupportedFileFormatError, + EmptyDatasetError, + DatasetNotFoundError, + InvalidQueryError, + FileAccessError, + # Processing Errors + LLMConnectionError, + LLMRateLimitError, + ProcessingTimeoutError, + DatabaseConnectionError, + InsufficientResourcesError, + # Configuration Errors + MissingAPIKeyError, + InvalidDatabaseConfigError, + UnsupportedSearchTypeError, + # Pipeline Errors + PipelineExecutionError, + DataExtractionError, + NoDataToProcessError, +) + +# For backward compatibility, create aliases +# These will allow existing code to continue working while we migrate +DatasetNotFoundError_Legacy = InvalidValueError # For existing dataset not found errors +PermissionDeniedError_Legacy = CogneeApiError # For existing permission errors + +__all__ = [ + # Original exceptions (backward compatibility) + "CogneeApiError", + "ServiceError", + "InvalidValueError", + "InvalidAttributeError", + "CriticalError", + # Enhanced base exceptions + "CogneeBaseError", + "CogneeUserError", + "CogneeSystemError", + "CogneeTransientError", + "CogneeConfigurationError", + "CogneeValidationError", + "CogneeAuthenticationError", + "CogneePermissionError", + "CogneeNotFoundError", + "CogneeRateLimitError", + # Domain-specific exceptions + "UnsupportedFileFormatError", + "EmptyDatasetError", + "DatasetNotFoundError", + "InvalidQueryError", + "FileAccessError", + "LLMConnectionError", + "LLMRateLimitError", + "ProcessingTimeoutError", + "DatabaseConnectionError", + "InsufficientResourcesError", + "MissingAPIKeyError", + "InvalidDatabaseConfigError", + "UnsupportedSearchTypeError", + "PipelineExecutionError", + "DataExtractionError", + "NoDataToProcessError", +] diff --git a/cognee/exceptions/domain_exceptions.py b/cognee/exceptions/domain_exceptions.py new file mode 100644 index 000000000..d7c1bdacd --- /dev/null +++ b/cognee/exceptions/domain_exceptions.py @@ -0,0 +1,337 @@ +from typing import List, Optional, Dict, Any +from .enhanced_exceptions import ( + CogneeUserError, + CogneeSystemError, + CogneeTransientError, + CogneeConfigurationError, + CogneeValidationError, + CogneeNotFoundError, + CogneePermissionError, +) + + +# ========== DATA/INPUT ERRORS (User-fixable) ========== + + +class UnsupportedFileFormatError(CogneeValidationError): + """File format not supported by Cognee""" + + def __init__(self, file_path: str, supported_formats: List[str], **kwargs): + super().__init__( + message=f"File format not supported: {file_path}", + user_message=f"The file '{file_path}' has an unsupported format.", + suggestions=[ + f"Use one of these supported formats: {', '.join(supported_formats)}", + "Convert your file to a supported format", + "Check our documentation for the complete list of supported formats", + ], + docs_link="https://docs.cognee.ai/guides/file-formats", + context={"file_path": file_path, "supported_formats": supported_formats}, + operation="add", + **kwargs, + ) + + +class EmptyDatasetError(CogneeValidationError): + """Dataset is empty or contains no processable content""" + + def __init__(self, dataset_name: str, **kwargs): + super().__init__( + message=f"Dataset '{dataset_name}' is empty", + user_message=f"The dataset '{dataset_name}' contains no data to process.", + suggestions=[ + "Add some data to the dataset first using cognee.add()", + "Check if your files contain readable text content", + "Verify that your data was uploaded successfully", + ], + docs_link="https://docs.cognee.ai/guides/adding-data", + context={"dataset_name": dataset_name}, + operation="cognify", + **kwargs, + ) + + +class DatasetNotFoundError(CogneeNotFoundError): + """Dataset not found or not accessible""" + + def __init__( + self, dataset_identifier: str, available_datasets: Optional[List[str]] = None, **kwargs + ): + suggestions = ["Check the dataset name for typos"] + if available_datasets: + suggestions.extend( + [ + f"Available datasets: {', '.join(available_datasets)}", + "Use cognee.datasets() to see all your datasets", + ] + ) + else: + suggestions.append("Create the dataset first by adding data to it") + + super().__init__( + message=f"Dataset not found: {dataset_identifier}", + user_message=f"Could not find dataset '{dataset_identifier}'.", + suggestions=suggestions, + docs_link="https://docs.cognee.ai/guides/datasets", + context={ + "dataset_identifier": dataset_identifier, + "available_datasets": available_datasets, + }, + **kwargs, + ) + + +class InvalidQueryError(CogneeValidationError): + """Search query is invalid or malformed""" + + def __init__(self, query: str, reason: str, **kwargs): + super().__init__( + message=f"Invalid query: {reason}", + user_message=f"Your search query '{query}' is invalid: {reason}", + suggestions=[ + "Try rephrasing your query", + "Use simpler, more specific terms", + "Check our query examples in the documentation", + ], + docs_link="https://docs.cognee.ai/guides/search", + context={"query": query, "reason": reason}, + operation="search", + **kwargs, + ) + + +class FileAccessError(CogneeUserError): + """Cannot access or read the specified file""" + + def __init__(self, file_path: str, reason: str, **kwargs): + super().__init__( + message=f"Cannot access file: {file_path} - {reason}", + user_message=f"Unable to read the file '{file_path}': {reason}", + suggestions=[ + "Check if the file exists at the specified path", + "Verify you have read permissions for the file", + "Ensure the file is not locked by another application", + ], + context={"file_path": file_path, "reason": reason}, + operation="add", + **kwargs, + ) + + +# ========== PROCESSING ERRORS (System/LLM errors) ========== + + +class LLMConnectionError(CogneeTransientError): + """LLM service connection failure""" + + def __init__(self, provider: str, model: str, reason: str, **kwargs): + super().__init__( + message=f"LLM connection failed: {provider}/{model} - {reason}", + user_message=f"Cannot connect to the {provider} language model service.", + suggestions=[ + "Check your internet connection", + "Verify your API key is correct and has sufficient credits", + "Try again in a few moments", + "Check the service status page", + ], + docs_link="https://docs.cognee.ai/troubleshooting/llm-connection", + context={"provider": provider, "model": model, "reason": reason}, + **kwargs, + ) + + +class LLMRateLimitError(CogneeTransientError): + """LLM service rate limit exceeded""" + + def __init__(self, provider: str, retry_after: Optional[int] = None, **kwargs): + suggestions = [ + "Wait a moment before retrying", + "Consider upgrading your API plan", + "Use smaller batch sizes to reduce token usage", + ] + if retry_after: + suggestions.insert(0, f"Wait {retry_after} seconds before retrying") + + super().__init__( + message=f"Rate limit exceeded for {provider}", + user_message=f"You've exceeded the rate limit for {provider}.", + suggestions=suggestions, + context={"provider": provider, "retry_after": retry_after}, + **kwargs, + ) + + +class ProcessingTimeoutError(CogneeTransientError): + """Processing operation timed out""" + + def __init__(self, operation: str, timeout_seconds: int, **kwargs): + super().__init__( + message=f"Operation '{operation}' timed out after {timeout_seconds}s", + user_message=f"The {operation} operation took too long and was cancelled.", + suggestions=[ + "Try processing smaller amounts of data at a time", + "Check your internet connection stability", + "Retry the operation", + "Use background processing for large datasets", + ], + context={"operation": operation, "timeout_seconds": timeout_seconds}, + **kwargs, + ) + + +class DatabaseConnectionError(CogneeSystemError): + """Database connection failure""" + + def __init__(self, db_type: str, reason: str, **kwargs): + super().__init__( + message=f"{db_type} database connection failed: {reason}", + user_message=f"Cannot connect to the {db_type} database.", + suggestions=[ + "Check if the database service is running", + "Verify database connection configuration", + "Check network connectivity", + "Contact support if the issue persists", + ], + docs_link="https://docs.cognee.ai/troubleshooting/database", + context={"db_type": db_type, "reason": reason}, + **kwargs, + ) + + +class InsufficientResourcesError(CogneeSystemError): + """System has insufficient resources to complete the operation""" + + def __init__(self, resource_type: str, required: str, available: str, **kwargs): + super().__init__( + message=f"Insufficient {resource_type}: need {required}, have {available}", + user_message=f"Not enough {resource_type} available to complete this operation.", + suggestions=[ + "Try processing smaller amounts of data", + "Free up system resources", + "Wait for other operations to complete", + "Consider upgrading your system resources", + ], + context={"resource_type": resource_type, "required": required, "available": available}, + **kwargs, + ) + + +# ========== CONFIGURATION ERRORS ========== + + +class MissingAPIKeyError(CogneeConfigurationError): + """Required API key is missing""" + + def __init__(self, service: str, env_var: str, **kwargs): + super().__init__( + message=f"Missing API key for {service}", + user_message=f"API key for {service} is not configured.", + suggestions=[ + f"Set the {env_var} environment variable", + f"Add your {service} API key to your .env file", + "Check the setup documentation for detailed instructions", + ], + docs_link="https://docs.cognee.ai/setup/api-keys", + context={"service": service, "env_var": env_var}, + **kwargs, + ) + + +class InvalidDatabaseConfigError(CogneeConfigurationError): + """Database configuration is invalid""" + + def __init__(self, db_type: str, config_issue: str, **kwargs): + super().__init__( + message=f"Invalid {db_type} database configuration: {config_issue}", + user_message=f"The {db_type} database is not properly configured: {config_issue}", + suggestions=[ + "Check your database configuration settings", + "Verify connection strings and credentials", + "Review the database setup documentation", + "Ensure the database server is accessible", + ], + docs_link="https://docs.cognee.ai/setup/databases", + context={"db_type": db_type, "config_issue": config_issue}, + **kwargs, + ) + + +class UnsupportedSearchTypeError(CogneeValidationError): + """Search type is not supported""" + + def __init__(self, search_type: str, supported_types: List[str], **kwargs): + super().__init__( + message=f"Unsupported search type: {search_type}", + user_message=f"The search type '{search_type}' is not supported.", + suggestions=[ + f"Use one of these supported search types: {', '.join(supported_types)}", + "Check the search documentation for available types", + "Try using GRAPH_COMPLETION for general queries", + ], + docs_link="https://docs.cognee.ai/guides/search-types", + context={"search_type": search_type, "supported_types": supported_types}, + operation="search", + **kwargs, + ) + + +# ========== PIPELINE ERRORS ========== + + +class PipelineExecutionError(CogneeSystemError): + """Pipeline execution failed""" + + def __init__(self, pipeline_name: str, task_name: str, error_details: str, **kwargs): + super().__init__( + message=f"Pipeline '{pipeline_name}' failed at task '{task_name}': {error_details}", + user_message=f"Processing failed during the {task_name} step.", + suggestions=[ + "Check the logs for more detailed error information", + "Verify your data is in a supported format", + "Try processing smaller amounts of data", + "Contact support if the issue persists", + ], + context={ + "pipeline_name": pipeline_name, + "task_name": task_name, + "error_details": error_details, + }, + **kwargs, + ) + + +class DataExtractionError(CogneeSystemError): + """Failed to extract content from data""" + + def __init__(self, source: str, reason: str, **kwargs): + super().__init__( + message=f"Data extraction failed for {source}: {reason}", + user_message=f"Could not extract readable content from '{source}'.", + suggestions=[ + "Verify the file is not corrupted", + "Try converting to a different format", + "Check if the file contains readable text", + "Use a supported file format", + ], + context={"source": source, "reason": reason}, + operation="add", + **kwargs, + ) + + +class NoDataToProcessError(CogneeValidationError): + """No data available to process""" + + def __init__(self, operation: str, **kwargs): + super().__init__( + message=f"No data available for {operation}", + user_message=f"There's no data to process for the {operation} operation.", + suggestions=[ + "Add some data first using cognee.add()", + "Check if your previous data upload was successful", + "Verify the dataset contains processable content", + ], + docs_link="https://docs.cognee.ai/guides/adding-data", + context={"operation": operation}, + **kwargs, + ) diff --git a/cognee/exceptions/enhanced_exceptions.py b/cognee/exceptions/enhanced_exceptions.py new file mode 100644 index 000000000..0bfa58f41 --- /dev/null +++ b/cognee/exceptions/enhanced_exceptions.py @@ -0,0 +1,184 @@ +from typing import Dict, List, Optional, Any +from fastapi import status +from cognee.shared.logging_utils import get_logger + +logger = get_logger() + + +class CogneeBaseError(Exception): + """ + Base exception for all Cognee errors with enhanced context and user experience. + + This class provides a foundation for all Cognee exceptions with: + - Rich error context + - User-friendly messages + - Actionable suggestions + - Documentation links + - Retry information + """ + + def __init__( + self, + message: str, + user_message: Optional[str] = None, + suggestions: Optional[List[str]] = None, + docs_link: Optional[str] = None, + context: Optional[Dict[str, Any]] = None, + is_retryable: bool = False, + status_code: int = status.HTTP_500_INTERNAL_SERVER_ERROR, + log_level: str = "ERROR", + operation: Optional[str] = None, + ): + self.message = message + self.user_message = user_message or message + self.suggestions = suggestions or [] + self.docs_link = docs_link + self.context = context or {} + self.is_retryable = is_retryable + self.status_code = status_code + self.operation = operation + + # Automatically log the exception + if log_level == "ERROR": + logger.error(f"CogneeError in {operation or 'unknown'}: {message}", extra=self.context) + elif log_level == "WARNING": + logger.warning( + f"CogneeWarning in {operation or 'unknown'}: {message}", extra=self.context + ) + elif log_level == "INFO": + logger.info(f"CogneeInfo in {operation or 'unknown'}: {message}", extra=self.context) + + super().__init__(self.message) + + def __str__(self): + return f"{self.__class__.__name__}: {self.message}" + + def to_dict(self) -> Dict[str, Any]: + """Convert exception to dictionary for API responses""" + return { + "type": self.__class__.__name__, + "message": self.user_message, + "technical_message": self.message, + "suggestions": self.suggestions, + "docs_link": self.docs_link, + "is_retryable": self.is_retryable, + "context": self.context, + "operation": self.operation, + } + + +class CogneeUserError(CogneeBaseError): + """ + User-fixable errors (4xx status codes). + + These are errors caused by user input or actions that can be corrected + by the user. Examples: invalid file format, missing required field. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_400_BAD_REQUEST) + kwargs.setdefault("log_level", "WARNING") + super().__init__(*args, **kwargs) + + +class CogneeSystemError(CogneeBaseError): + """ + System/infrastructure errors (5xx status codes). + + These are errors caused by system issues that require technical intervention. + Examples: database connection failure, service unavailable. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_500_INTERNAL_SERVER_ERROR) + kwargs.setdefault("log_level", "ERROR") + super().__init__(*args, **kwargs) + + +class CogneeTransientError(CogneeBaseError): + """ + Temporary/retryable errors. + + These are errors that might succeed if retried, often due to temporary + resource constraints or network issues. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_503_SERVICE_UNAVAILABLE) + kwargs.setdefault("is_retryable", True) + kwargs.setdefault("log_level", "WARNING") + super().__init__(*args, **kwargs) + + +class CogneeConfigurationError(CogneeBaseError): + """ + Setup/configuration errors. + + These are errors related to missing or invalid configuration that + prevent the system from operating correctly. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_422_UNPROCESSABLE_ENTITY) + kwargs.setdefault("log_level", "ERROR") + super().__init__(*args, **kwargs) + + +class CogneeValidationError(CogneeUserError): + """ + Input validation errors. + + Specific type of user error for invalid input data. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_422_UNPROCESSABLE_ENTITY) + super().__init__(*args, **kwargs) + + +class CogneeAuthenticationError(CogneeUserError): + """ + Authentication and authorization errors. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_401_UNAUTHORIZED) + super().__init__(*args, **kwargs) + + +class CogneePermissionError(CogneeUserError): + """ + Permission denied errors. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_403_FORBIDDEN) + super().__init__(*args, **kwargs) + + +class CogneeNotFoundError(CogneeUserError): + """ + Resource not found errors. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_404_NOT_FOUND) + super().__init__(*args, **kwargs) + + +class CogneeRateLimitError(CogneeTransientError): + """ + Rate limiting errors. + """ + + def __init__(self, *args, **kwargs): + kwargs.setdefault("status_code", status.HTTP_429_TOO_MANY_REQUESTS) + kwargs.setdefault( + "suggestions", + [ + "Wait a moment before retrying", + "Check your API rate limits", + "Consider using smaller batch sizes", + ], + ) + super().__init__(*args, **kwargs) diff --git a/cognee/modules/pipelines/operations/run_tasks_base.py b/cognee/modules/pipelines/operations/run_tasks_base.py index e5f577848..02f001c19 100644 --- a/cognee/modules/pipelines/operations/run_tasks_base.py +++ b/cognee/modules/pipelines/operations/run_tasks_base.py @@ -2,6 +2,14 @@ import inspect from cognee.shared.logging_utils import get_logger from cognee.modules.users.models import User from cognee.shared.utils import send_telemetry +from cognee.exceptions import ( + PipelineExecutionError, + CogneeTransientError, + CogneeSystemError, + CogneeUserError, + LLMConnectionError, + DatabaseConnectionError, +) from ..tasks.task import Task @@ -16,15 +24,33 @@ async def handle_task( user: User, context: dict = None, ): - """Handle common task workflow with logging, telemetry, and error handling around the core execution logic.""" - task_type = running_task.task_type + """ + Handle common task workflow with enhanced error handling and recovery strategies. + + This function provides comprehensive error handling for pipeline tasks with: + - Context-aware error reporting + - Automatic retry for transient errors + - Detailed error logging and telemetry + - User-friendly error messages + """ + task_type = running_task.task_type + task_name = running_task.executable.__name__ + + logger.info( + f"{task_type} task started: `{task_name}`", + extra={ + "task_type": task_type, + "task_name": task_name, + "user_id": user.id, + "context": context, + }, + ) - logger.info(f"{task_type} task started: `{running_task.executable.__name__}`") send_telemetry( f"{task_type} Task Started", user_id=user.id, additional_properties={ - "task_name": running_task.executable.__name__, + "task_name": task_name, }, ) @@ -35,36 +61,151 @@ async def handle_task( if has_context: args.append(context) - try: - async for result_data in running_task.execute(args, next_task_batch_size): - async for result in run_tasks_base(leftover_tasks, result_data, user, context): - yield result + # Retry configuration for transient errors + max_retries = 3 + retry_count = 0 - logger.info(f"{task_type} task completed: `{running_task.executable.__name__}`") - send_telemetry( - f"{task_type} Task Completed", - user_id=user.id, - additional_properties={ - "task_name": running_task.executable.__name__, - }, - ) - except Exception as error: - logger.error( - f"{task_type} task errored: `{running_task.executable.__name__}`\n{str(error)}\n", - exc_info=True, - ) - send_telemetry( - f"{task_type} Task Errored", - user_id=user.id, - additional_properties={ - "task_name": running_task.executable.__name__, - }, - ) - raise error + while retry_count <= max_retries: + try: + async for result_data in running_task.execute(args, next_task_batch_size): + async for result in run_tasks_base(leftover_tasks, result_data, user, context): + yield result + + logger.info( + f"{task_type} task completed: `{task_name}`", + extra={ + "task_type": task_type, + "task_name": task_name, + "user_id": user.id, + "retry_count": retry_count, + }, + ) + + send_telemetry( + f"{task_type} Task Completed", + user_id=user.id, + additional_properties={ + "task_name": task_name, + "retry_count": retry_count, + }, + ) + return # Success, exit retry loop + + except CogneeTransientError as error: + retry_count += 1 + if retry_count <= max_retries: + logger.warning( + f"Transient error in {task_type} task `{task_name}`, retrying ({retry_count}/{max_retries}): {error}", + extra={ + "task_type": task_type, + "task_name": task_name, + "user_id": user.id, + "retry_count": retry_count, + "error_type": error.__class__.__name__, + }, + ) + # Exponential backoff for retries + import asyncio + + await asyncio.sleep(2**retry_count) + continue + else: + # Max retries exceeded, raise enhanced error + raise PipelineExecutionError( + pipeline_name=f"{task_type}_pipeline", + task_name=task_name, + error_details=f"Max retries ({max_retries}) exceeded for transient error: {error}", + ) + + except (CogneeUserError, CogneeSystemError) as error: + # These errors shouldn't be retried, re-raise as pipeline execution error + logger.error( + f"{task_type} task failed: `{task_name}` - {error.__class__.__name__}: {error}", + extra={ + "task_type": task_type, + "task_name": task_name, + "user_id": user.id, + "error_type": error.__class__.__name__, + "error_context": getattr(error, "context", {}), + }, + exc_info=True, + ) + + send_telemetry( + f"{task_type} Task Errored", + user_id=user.id, + additional_properties={ + "task_name": task_name, + "error_type": error.__class__.__name__, + }, + ) + + # Wrap in pipeline execution error with additional context + raise PipelineExecutionError( + pipeline_name=f"{task_type}_pipeline", + task_name=task_name, + error_details=f"{error.__class__.__name__}: {error}", + context={ + "original_error": error.__class__.__name__, + "original_context": getattr(error, "context", {}), + "user_id": user.id, + "task_args": str(args)[:200], # Truncate for logging + }, + ) + + except Exception as error: + # Unexpected error, wrap in enhanced exception + logger.error( + f"{task_type} task encountered unexpected error: `{task_name}` - {error}", + extra={ + "task_type": task_type, + "task_name": task_name, + "user_id": user.id, + "error_type": error.__class__.__name__, + }, + exc_info=True, + ) + + send_telemetry( + f"{task_type} Task Errored", + user_id=user.id, + additional_properties={ + "task_name": task_name, + "error_type": error.__class__.__name__, + }, + ) + + # Check if this might be a known error type we can categorize + error_message = str(error).lower() + if any(term in error_message for term in ["connection", "timeout", "network"]): + if ( + "llm" in error_message + or "openai" in error_message + or "anthropic" in error_message + ): + raise LLMConnectionError(provider="Unknown", model="Unknown", reason=str(error)) + elif "database" in error_message or "sql" in error_message: + raise DatabaseConnectionError(db_type="Unknown", reason=str(error)) + + # Default to pipeline execution error + raise PipelineExecutionError( + pipeline_name=f"{task_type}_pipeline", + task_name=task_name, + error_details=f"Unexpected error: {error}", + context={ + "error_type": error.__class__.__name__, + "user_id": user.id, + "task_args": str(args)[:200], # Truncate for logging + }, + ) async def run_tasks_base(tasks: list[Task], data=None, user: User = None, context: dict = None): - """Base function to execute tasks in a pipeline, handling task type detection and execution.""" + """ + Base function to execute tasks in a pipeline with enhanced error handling. + + Provides comprehensive error handling, logging, and recovery strategies for pipeline execution. + """ if len(tasks) == 0: yield data return