feat: add telemetry logging to pipelines and tasks (#140)

* feat: add telemetry logging to pipelines and tasks

* fix: enable telemetry for local environment
This commit is contained in:
Boris 2024-09-29 12:20:48 +02:00 committed by GitHub
parent 41bf8617f1
commit 56868d8a6f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 118 additions and 40 deletions

View file

@ -65,7 +65,7 @@ export default function DatasetsView({
<StatusIcon status={dataset.status} /> <StatusIcon status={dataset.status} />
<DropdownMenu> <DropdownMenu>
<Stack gap="1" className={styles.datasetMenu} orientation="vertical"> <Stack gap="1" className={styles.datasetMenu} orientation="vertical">
{dataset.status === 'DATASET_PROCESSING_FINISHED' ? ( {dataset.status === 'DATASET_PROCESSING_COMPLETED' ? (
<CTAButton <CTAButton
onClick={(event: React.MouseEvent<HTMLButtonElement>) => handleExploreDataset(event, dataset)} onClick={(event: React.MouseEvent<HTMLButtonElement>) => handleExploreDataset(event, dataset)}
> >

View file

@ -1,5 +1,5 @@
export default function StatusIcon({ status }: { status: 'DATASET_PROCESSING_FINISHED' | string }) { export default function StatusIcon({ status }: { status: 'DATASET_PROCESSING_COMPLETED' | string }) {
const isSuccess = status === 'DATASET_PROCESSING_FINISHED'; const isSuccess = status === 'DATASET_PROCESSING_COMPLETED';
return ( return (
<div <div

View file

@ -152,12 +152,13 @@ async def add_files(file_paths: List[str], dataset_name: str, user: User = None)
await give_permission_on_document(user, data_id, "write") await give_permission_on_document(user, data_id, "write")
send_telemetry("cognee.add EXECUTION STARTED", user_id = user.id)
run_info = pipeline.run( run_info = pipeline.run(
data_resources(processed_file_paths, user), data_resources(processed_file_paths, user),
table_name = "file_metadata", table_name = "file_metadata",
dataset_name = dataset_name, dataset_name = dataset_name,
write_disposition = "merge", write_disposition = "merge",
) )
send_telemetry("cognee.add") send_telemetry("cognee.add EXECUTION COMPLETED", user_id = user.id)
return run_info return run_info

View file

@ -2,6 +2,7 @@ import asyncio
import logging import logging
from typing import Union from typing import Union
from cognee.shared.utils import send_telemetry
from cognee.modules.cognify.config import get_cognify_config from cognee.modules.cognify.config import get_cognify_config
from cognee.shared.data_models import KnowledgeGraph from cognee.shared.data_models import KnowledgeGraph
from cognee.modules.data.models import Dataset, Data from cognee.modules.data.models import Dataset, Data
@ -69,6 +70,8 @@ async def run_cognify_pipeline(dataset: Dataset, user: User):
dataset_id = dataset.id dataset_id = dataset.id
dataset_name = generate_dataset_name(dataset.name) dataset_name = generate_dataset_name(dataset.name)
send_telemetry("cognee.cognify EXECUTION STARTED", user.id)
async with update_status_lock: async with update_status_lock:
task_status = await get_pipeline_status([dataset_id]) task_status = await get_pipeline_status([dataset_id])
@ -110,17 +113,21 @@ async def run_cognify_pipeline(dataset: Dataset, user: User):
Task(chunk_remove_disconnected), # Remove the obsolete document chunks. Task(chunk_remove_disconnected), # Remove the obsolete document chunks.
] ]
pipeline = run_tasks(tasks, data_documents) pipeline = run_tasks(tasks, data_documents, "cognify_pipeline")
async for result in pipeline: async for result in pipeline:
print(result) print(result)
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_FINISHED", { send_telemetry("cognee.cognify EXECUTION COMPLETED", user.id)
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_COMPLETED", {
"dataset_name": dataset_name, "dataset_name": dataset_name,
"files": document_ids_str, "files": document_ids_str,
}) })
except Exception as error: except Exception as error:
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_ERROR", { send_telemetry("cognee.cognify EXECUTION ERRORED", user.id)
await log_pipeline_status(dataset_id, "DATASET_PROCESSING_ERRORED", {
"dataset_name": dataset_name, "dataset_name": dataset_name,
"files": document_ids_str, "files": document_ids_str,
}) })

View file

@ -51,7 +51,7 @@ async def search(search_type: str, params: Dict[str, Any], user: User = None) ->
own_document_ids = await get_document_ids_for_user(user.id) own_document_ids = await get_document_ids_for_user(user.id)
search_params = SearchParameters(search_type = search_type, params = params) search_params = SearchParameters(search_type = search_type, params = params)
search_results = await specific_search([search_params]) search_results = await specific_search([search_params], user)
from uuid import UUID from uuid import UUID
@ -67,7 +67,7 @@ async def search(search_type: str, params: Dict[str, Any], user: User = None) ->
return filtered_search_results return filtered_search_results
async def specific_search(query_params: List[SearchParameters]) -> List: async def specific_search(query_params: List[SearchParameters], user) -> List:
search_functions: Dict[SearchType, Callable] = { search_functions: Dict[SearchType, Callable] = {
SearchType.ADJACENT: search_adjacent, SearchType.ADJACENT: search_adjacent,
SearchType.SUMMARY: search_summary, SearchType.SUMMARY: search_summary,
@ -78,6 +78,8 @@ async def specific_search(query_params: List[SearchParameters]) -> List:
search_tasks = [] search_tasks = []
send_telemetry("cognee.search EXECUTION STARTED", user.id)
for search_param in query_params: for search_param in query_params:
search_func = search_functions.get(search_param.search_type) search_func = search_functions.get(search_param.search_type)
if search_func: if search_func:
@ -88,6 +90,6 @@ async def specific_search(query_params: List[SearchParameters]) -> List:
# Use asyncio.gather to run all scheduled tasks concurrently # Use asyncio.gather to run all scheduled tasks concurrently
search_results = await asyncio.gather(*search_tasks) search_results = await asyncio.gather(*search_tasks)
send_telemetry("cognee.search") send_telemetry("cognee.search EXECUTION COMPLETED", user.id)
return search_results[0] if len(search_results) == 1 else search_results return search_results[0] if len(search_results) == 1 else search_results

View file

@ -1,10 +1,13 @@
import inspect import inspect
import logging import logging
from cognee.shared.utils import send_telemetry
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_default_user
from ..tasks.Task import Task from ..tasks.Task import Task
logger = logging.getLogger("run_tasks(tasks: [Task], data)") logger = logging.getLogger("run_tasks(tasks: [Task], data)")
async def run_tasks(tasks: [Task], data = None): async def run_tasks_base(tasks: [Task], data = None, user: User = None):
if len(tasks) == 0: if len(tasks) == 0:
yield data yield data
return return
@ -17,7 +20,10 @@ async def run_tasks(tasks: [Task], data = None):
next_task_batch_size = next_task.task_config["batch_size"] if next_task else 1 next_task_batch_size = next_task.task_config["batch_size"] if next_task else 1
if inspect.isasyncgenfunction(running_task.executable): if inspect.isasyncgenfunction(running_task.executable):
logger.info("Running async generator task: `%s`", running_task.executable.__name__) logger.info("Async generator task started: `%s`", running_task.executable.__name__)
send_telemetry("Async Generator Task Started", user.id, {
"task_name": running_task.executable.__name__,
})
try: try:
results = [] results = []
@ -27,29 +33,42 @@ async def run_tasks(tasks: [Task], data = None):
results.append(partial_result) results.append(partial_result)
if len(results) == next_task_batch_size: if len(results) == next_task_batch_size:
async for result in run_tasks(leftover_tasks, results[0] if next_task_batch_size == 1 else results): async for result in run_tasks_base(
leftover_tasks,
results[0] if next_task_batch_size == 1 else results,
user = user,
):
yield result yield result
results = [] results = []
if len(results) > 0: if len(results) > 0:
async for result in run_tasks(leftover_tasks, results): async for result in run_tasks_base(leftover_tasks, results, user):
yield result yield result
results = [] results = []
logger.info("Finished async generator task: `%s`", running_task.executable.__name__) logger.info("Async generator task completed: `%s`", running_task.executable.__name__)
send_telemetry("Async Generator Task Completed", user.id, {
"task_name": running_task.executable.__name__,
})
except Exception as error: except Exception as error:
logger.error( logger.error(
"Error occurred while running async generator task: `%s`\n%s\n", "Async generator task errored: `%s`\n%s\n",
running_task.executable.__name__, running_task.executable.__name__,
str(error), str(error),
exc_info = True, exc_info = True,
) )
send_telemetry("Async Generator Task Errored", user.id, {
"task_name": running_task.executable.__name__,
})
raise error raise error
elif inspect.isgeneratorfunction(running_task.executable): elif inspect.isgeneratorfunction(running_task.executable):
logger.info("Running generator task: `%s`", running_task.executable.__name__) logger.info("Generator task started: `%s`", running_task.executable.__name__)
send_telemetry("Generator Task Started", user.id, {
"task_name": running_task.executable.__name__,
})
try: try:
results = [] results = []
@ -57,59 +76,112 @@ async def run_tasks(tasks: [Task], data = None):
results.append(partial_result) results.append(partial_result)
if len(results) == next_task_batch_size: if len(results) == next_task_batch_size:
async for result in run_tasks(leftover_tasks, results[0] if next_task_batch_size == 1 else results): async for result in run_tasks_base(leftover_tasks, results[0] if next_task_batch_size == 1 else results, user):
yield result yield result
results = [] results = []
if len(results) > 0: if len(results) > 0:
async for result in run_tasks(leftover_tasks, results): async for result in run_tasks_base(leftover_tasks, results, user):
yield result yield result
results = [] results = []
logger.info("Finished generator task: `%s`", running_task.executable.__name__) logger.info("Generator task completed: `%s`", running_task.executable.__name__)
send_telemetry("Generator Task Completed", user_id = user.id, additional_properties = {
"task_name": running_task.executable.__name__,
})
except Exception as error: except Exception as error:
logger.error( logger.error(
"Error occurred while running generator task: `%s`\n%s\n", "Generator task errored: `%s`\n%s\n",
running_task.executable.__name__, running_task.executable.__name__,
str(error), str(error),
exc_info = True, exc_info = True,
) )
send_telemetry("Generator Task Errored", user_id = user.id, additional_properties = {
"task_name": running_task.executable.__name__,
})
raise error raise error
elif inspect.iscoroutinefunction(running_task.executable): elif inspect.iscoroutinefunction(running_task.executable):
logger.info("Running coroutine task: `%s`", running_task.executable.__name__) logger.info("Coroutine task started: `%s`", running_task.executable.__name__)
send_telemetry("Coroutine Task Started", user_id = user.id, additional_properties = {
"task_name": running_task.executable.__name__,
})
try: try:
task_result = await running_task.run(*args) task_result = await running_task.run(*args)
async for result in run_tasks(leftover_tasks, task_result): async for result in run_tasks_base(leftover_tasks, task_result, user):
yield result yield result
logger.info("Finished coroutine task: `%s`", running_task.executable.__name__) logger.info("Coroutine task completed: `%s`", running_task.executable.__name__)
send_telemetry("Coroutine Task Completed", user.id, {
"task_name": running_task.executable.__name__,
})
except Exception as error: except Exception as error:
logger.error( logger.error(
"Error occurred while running coroutine task: `%s`\n%s\n", "Coroutine task errored: `%s`\n%s\n",
running_task.executable.__name__, running_task.executable.__name__,
str(error), str(error),
exc_info = True, exc_info = True,
) )
send_telemetry("Coroutine Task Errored", user.id, {
"task_name": running_task.executable.__name__,
})
raise error raise error
elif inspect.isfunction(running_task.executable): elif inspect.isfunction(running_task.executable):
logger.info("Running function task: `%s`", running_task.executable.__name__) logger.info("Function task started: `%s`", running_task.executable.__name__)
send_telemetry("Function Task Started", user.id, {
"task_name": running_task.executable.__name__,
})
try: try:
task_result = running_task.run(*args) task_result = running_task.run(*args)
async for result in run_tasks(leftover_tasks, task_result): async for result in run_tasks_base(leftover_tasks, task_result, user):
yield result yield result
logger.info("Finished function task: `%s`", running_task.executable.__name__) logger.info("Function task completed: `%s`", running_task.executable.__name__)
send_telemetry("Function Task Completed", user.id, {
"task_name": running_task.executable.__name__,
})
except Exception as error: except Exception as error:
logger.error( logger.error(
"Error occurred while running function task: `%s`\n%s\n", "Function task errored: `%s`\n%s\n",
running_task.executable.__name__, running_task.executable.__name__,
str(error), str(error),
exc_info = True, exc_info = True,
) )
send_telemetry("Function Task Errored", user.id, {
"task_name": running_task.executable.__name__,
})
raise error raise error
async def run_tasks(tasks: [Task], data = None, pipeline_name: str = "default_pipeline"):
user = await get_default_user()
try:
logger.info("Pipeline run started: `%s`", pipeline_name)
send_telemetry("Pipeline Run Started", user.id, {
"pipeline_name": pipeline_name,
})
async for result in run_tasks_base(tasks, data, user):
yield result
logger.info("Pipeline run completed: `%s`", pipeline_name)
send_telemetry("Pipeline Run Completed", user.id, {
"pipeline_name": pipeline_name,
})
except Exception as error:
logger.error(
"Pipeline run errored: `%s`\n%s\n",
pipeline_name,
str(error),
exc_info = True,
)
send_telemetry("Pipeline Run Errored", user.id, {
"pipeline_name": pipeline_name,
})
raise error

View file

@ -1,7 +1,5 @@
""" This module contains utility functions for the cognee. """ """ This module contains utility functions for the cognee. """
import logging
import os import os
import uuid
import datetime import datetime
import graphistry import graphistry
import networkx as nx import networkx as nx
@ -14,25 +12,23 @@ from posthog import Posthog
from cognee.base_config import get_base_config from cognee.base_config import get_base_config
from cognee.infrastructure.databases.graph import get_graph_engine from cognee.infrastructure.databases.graph import get_graph_engine
def send_telemetry(event_name: str): def send_telemetry(event_name: str, user_id, additional_properties: dict = {}):
if os.getenv("TELEMETRY_DISABLED"): if os.getenv("TELEMETRY_DISABLED"):
print("Telemetry is disabled.")
logging.info("Telemetry is disabled.")
return return
env = os.getenv("ENV") env = os.getenv("ENV")
if env in ["local", "test", "dev"]: if env in ["test", "dev"]:
return return
posthog = Posthog( posthog = Posthog(
project_api_key = "phc_bbR86N876kwub62Lr3dhQ7zIeRyMMMm0fxXqxPqzLm3", project_api_key = "phc_UB1YVere1KtJg1MFxAo6ABfpkwN3OxCvGNDkMTjvH0",
host="https://eu.i.posthog.com" host = "https://eu.i.posthog.com"
) )
user_id = str(uuid.uuid4())
current_time = datetime.datetime.now() current_time = datetime.datetime.now()
properties = { properties = {
"time": current_time.strftime("%m/%d/%Y") "time": current_time.strftime("%m/%d/%Y"),
**additional_properties,
} }
try: try:

View file

@ -5,7 +5,7 @@ echo "Environment: $ENVIRONMENT"
echo "Starting Gunicorn" echo "Starting Gunicorn"
if [ "$ENVIRONMENT" = "local" ]; then if [ "$ENVIRONMENT" = "dev" ]; then
if [ "$DEBUG" = true ]; then if [ "$DEBUG" = true ]; then
echo "Waiting for the debugger to attach..." echo "Waiting for the debugger to attach..."