From 56868d8a6f834590bc06ea629ee59467d14f5222 Mon Sep 17 00:00:00 2001 From: Boris Date: Sun, 29 Sep 2024 12:20:48 +0200 Subject: [PATCH] feat: add telemetry logging to pipelines and tasks (#140) * feat: add telemetry logging to pipelines and tasks * fix: enable telemetry for local environment --- .../ingestion/DatasetsView/DatasetsView.tsx | 2 +- .../ingestion/DatasetsView/StatusIcon.tsx | 4 +- cognee/api/v1/add/add.py | 3 +- cognee/api/v1/cognify/cognify_v2.py | 13 ++- cognee/api/v1/search/search.py | 8 +- .../modules/pipelines/operations/run_tasks.py | 110 +++++++++++++++--- cognee/shared/utils.py | 16 +-- entrypoint.sh | 2 +- 8 files changed, 118 insertions(+), 40 deletions(-) diff --git a/cognee-frontend/src/modules/ingestion/DatasetsView/DatasetsView.tsx b/cognee-frontend/src/modules/ingestion/DatasetsView/DatasetsView.tsx index 2b27fee32..13965230b 100644 --- a/cognee-frontend/src/modules/ingestion/DatasetsView/DatasetsView.tsx +++ b/cognee-frontend/src/modules/ingestion/DatasetsView/DatasetsView.tsx @@ -65,7 +65,7 @@ export default function DatasetsView({ - {dataset.status === 'DATASET_PROCESSING_FINISHED' ? ( + {dataset.status === 'DATASET_PROCESSING_COMPLETED' ? ( ) => handleExploreDataset(event, dataset)} > diff --git a/cognee-frontend/src/modules/ingestion/DatasetsView/StatusIcon.tsx b/cognee-frontend/src/modules/ingestion/DatasetsView/StatusIcon.tsx index b42c12ff8..c69bb7259 100644 --- a/cognee-frontend/src/modules/ingestion/DatasetsView/StatusIcon.tsx +++ b/cognee-frontend/src/modules/ingestion/DatasetsView/StatusIcon.tsx @@ -1,5 +1,5 @@ -export default function StatusIcon({ status }: { status: 'DATASET_PROCESSING_FINISHED' | string }) { - const isSuccess = status === 'DATASET_PROCESSING_FINISHED'; +export default function StatusIcon({ status }: { status: 'DATASET_PROCESSING_COMPLETED' | string }) { + const isSuccess = status === 'DATASET_PROCESSING_COMPLETED'; return (
own_document_ids = await get_document_ids_for_user(user.id) search_params = SearchParameters(search_type = search_type, params = params) - search_results = await specific_search([search_params]) + search_results = await specific_search([search_params], user) from uuid import UUID @@ -67,7 +67,7 @@ async def search(search_type: str, params: Dict[str, Any], user: User = None) -> return filtered_search_results -async def specific_search(query_params: List[SearchParameters]) -> List: +async def specific_search(query_params: List[SearchParameters], user) -> List: search_functions: Dict[SearchType, Callable] = { SearchType.ADJACENT: search_adjacent, SearchType.SUMMARY: search_summary, @@ -78,6 +78,8 @@ async def specific_search(query_params: List[SearchParameters]) -> List: search_tasks = [] + send_telemetry("cognee.search EXECUTION STARTED", user.id) + for search_param in query_params: search_func = search_functions.get(search_param.search_type) if search_func: @@ -88,6 +90,6 @@ async def specific_search(query_params: List[SearchParameters]) -> List: # Use asyncio.gather to run all scheduled tasks concurrently search_results = await asyncio.gather(*search_tasks) - send_telemetry("cognee.search") + send_telemetry("cognee.search EXECUTION COMPLETED", user.id) return search_results[0] if len(search_results) == 1 else search_results diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py index 99a6162d5..62a0c346a 100644 --- a/cognee/modules/pipelines/operations/run_tasks.py +++ b/cognee/modules/pipelines/operations/run_tasks.py @@ -1,10 +1,13 @@ import inspect import logging +from cognee.shared.utils import send_telemetry +from cognee.modules.users.models import User +from cognee.modules.users.methods import get_default_user from ..tasks.Task import Task logger = logging.getLogger("run_tasks(tasks: [Task], data)") -async def run_tasks(tasks: [Task], data = None): +async def run_tasks_base(tasks: [Task], data = None, user: User = None): if len(tasks) == 0: yield data return @@ -17,7 +20,10 @@ async def run_tasks(tasks: [Task], data = None): next_task_batch_size = next_task.task_config["batch_size"] if next_task else 1 if inspect.isasyncgenfunction(running_task.executable): - logger.info("Running async generator task: `%s`", running_task.executable.__name__) + logger.info("Async generator task started: `%s`", running_task.executable.__name__) + send_telemetry("Async Generator Task Started", user.id, { + "task_name": running_task.executable.__name__, + }) try: results = [] @@ -27,29 +33,42 @@ async def run_tasks(tasks: [Task], data = None): results.append(partial_result) if len(results) == next_task_batch_size: - async for result in run_tasks(leftover_tasks, results[0] if next_task_batch_size == 1 else results): + async for result in run_tasks_base( + leftover_tasks, + results[0] if next_task_batch_size == 1 else results, + user = user, + ): yield result results = [] if len(results) > 0: - async for result in run_tasks(leftover_tasks, results): + async for result in run_tasks_base(leftover_tasks, results, user): yield result results = [] - logger.info("Finished async generator task: `%s`", running_task.executable.__name__) + logger.info("Async generator task completed: `%s`", running_task.executable.__name__) + send_telemetry("Async Generator Task Completed", user.id, { + "task_name": running_task.executable.__name__, + }) except Exception as error: logger.error( - "Error occurred while running async generator task: `%s`\n%s\n", + "Async generator task errored: `%s`\n%s\n", running_task.executable.__name__, str(error), exc_info = True, ) + send_telemetry("Async Generator Task Errored", user.id, { + "task_name": running_task.executable.__name__, + }) raise error elif inspect.isgeneratorfunction(running_task.executable): - logger.info("Running generator task: `%s`", running_task.executable.__name__) + logger.info("Generator task started: `%s`", running_task.executable.__name__) + send_telemetry("Generator Task Started", user.id, { + "task_name": running_task.executable.__name__, + }) try: results = [] @@ -57,59 +76,112 @@ async def run_tasks(tasks: [Task], data = None): results.append(partial_result) if len(results) == next_task_batch_size: - async for result in run_tasks(leftover_tasks, results[0] if next_task_batch_size == 1 else results): + async for result in run_tasks_base(leftover_tasks, results[0] if next_task_batch_size == 1 else results, user): yield result results = [] if len(results) > 0: - async for result in run_tasks(leftover_tasks, results): + async for result in run_tasks_base(leftover_tasks, results, user): yield result results = [] - logger.info("Finished generator task: `%s`", running_task.executable.__name__) + logger.info("Generator task completed: `%s`", running_task.executable.__name__) + send_telemetry("Generator Task Completed", user_id = user.id, additional_properties = { + "task_name": running_task.executable.__name__, + }) except Exception as error: logger.error( - "Error occurred while running generator task: `%s`\n%s\n", + "Generator task errored: `%s`\n%s\n", running_task.executable.__name__, str(error), exc_info = True, ) + send_telemetry("Generator Task Errored", user_id = user.id, additional_properties = { + "task_name": running_task.executable.__name__, + }) raise error elif inspect.iscoroutinefunction(running_task.executable): - logger.info("Running coroutine task: `%s`", running_task.executable.__name__) + logger.info("Coroutine task started: `%s`", running_task.executable.__name__) + send_telemetry("Coroutine Task Started", user_id = user.id, additional_properties = { + "task_name": running_task.executable.__name__, + }) try: task_result = await running_task.run(*args) - async for result in run_tasks(leftover_tasks, task_result): + async for result in run_tasks_base(leftover_tasks, task_result, user): yield result - logger.info("Finished coroutine task: `%s`", running_task.executable.__name__) + logger.info("Coroutine task completed: `%s`", running_task.executable.__name__) + send_telemetry("Coroutine Task Completed", user.id, { + "task_name": running_task.executable.__name__, + }) except Exception as error: logger.error( - "Error occurred while running coroutine task: `%s`\n%s\n", + "Coroutine task errored: `%s`\n%s\n", running_task.executable.__name__, str(error), exc_info = True, ) + send_telemetry("Coroutine Task Errored", user.id, { + "task_name": running_task.executable.__name__, + }) raise error elif inspect.isfunction(running_task.executable): - logger.info("Running function task: `%s`", running_task.executable.__name__) + logger.info("Function task started: `%s`", running_task.executable.__name__) + send_telemetry("Function Task Started", user.id, { + "task_name": running_task.executable.__name__, + }) try: task_result = running_task.run(*args) - async for result in run_tasks(leftover_tasks, task_result): + async for result in run_tasks_base(leftover_tasks, task_result, user): yield result - logger.info("Finished function task: `%s`", running_task.executable.__name__) + logger.info("Function task completed: `%s`", running_task.executable.__name__) + send_telemetry("Function Task Completed", user.id, { + "task_name": running_task.executable.__name__, + }) except Exception as error: logger.error( - "Error occurred while running function task: `%s`\n%s\n", + "Function task errored: `%s`\n%s\n", running_task.executable.__name__, str(error), exc_info = True, ) + send_telemetry("Function Task Errored", user.id, { + "task_name": running_task.executable.__name__, + }) raise error + +async def run_tasks(tasks: [Task], data = None, pipeline_name: str = "default_pipeline"): + user = await get_default_user() + + try: + logger.info("Pipeline run started: `%s`", pipeline_name) + send_telemetry("Pipeline Run Started", user.id, { + "pipeline_name": pipeline_name, + }) + + async for result in run_tasks_base(tasks, data, user): + yield result + + logger.info("Pipeline run completed: `%s`", pipeline_name) + send_telemetry("Pipeline Run Completed", user.id, { + "pipeline_name": pipeline_name, + }) + except Exception as error: + logger.error( + "Pipeline run errored: `%s`\n%s\n", + pipeline_name, + str(error), + exc_info = True, + ) + send_telemetry("Pipeline Run Errored", user.id, { + "pipeline_name": pipeline_name, + }) + + raise error diff --git a/cognee/shared/utils.py b/cognee/shared/utils.py index 76a395504..f6b75f4e0 100644 --- a/cognee/shared/utils.py +++ b/cognee/shared/utils.py @@ -1,7 +1,5 @@ """ This module contains utility functions for the cognee. """ -import logging import os -import uuid import datetime import graphistry import networkx as nx @@ -14,25 +12,23 @@ from posthog import Posthog from cognee.base_config import get_base_config from cognee.infrastructure.databases.graph import get_graph_engine -def send_telemetry(event_name: str): +def send_telemetry(event_name: str, user_id, additional_properties: dict = {}): if os.getenv("TELEMETRY_DISABLED"): - print("Telemetry is disabled.") - logging.info("Telemetry is disabled.") return env = os.getenv("ENV") - if env in ["local", "test", "dev"]: + if env in ["test", "dev"]: return posthog = Posthog( - project_api_key = "phc_bbR86N876kwub62Lr3dhQ7zIeRyMMMm0fxXqxPqzLm3", - host="https://eu.i.posthog.com" + project_api_key = "phc_UB1YVere1KtJg1MFxAo6ABfpkwN3OxCvGNDkMTjvH0", + host = "https://eu.i.posthog.com" ) - user_id = str(uuid.uuid4()) current_time = datetime.datetime.now() properties = { - "time": current_time.strftime("%m/%d/%Y") + "time": current_time.strftime("%m/%d/%Y"), + **additional_properties, } try: diff --git a/entrypoint.sh b/entrypoint.sh index ed8b4780a..e1f9c75b7 100755 --- a/entrypoint.sh +++ b/entrypoint.sh @@ -5,7 +5,7 @@ echo "Environment: $ENVIRONMENT" echo "Starting Gunicorn" -if [ "$ENVIRONMENT" = "local" ]; then +if [ "$ENVIRONMENT" = "dev" ]; then if [ "$DEBUG" = true ]; then echo "Waiting for the debugger to attach..."