) => handleExploreDataset(event, dataset)}
>
diff --git a/cognee-frontend/src/modules/ingestion/DatasetsView/StatusIcon.tsx b/cognee-frontend/src/modules/ingestion/DatasetsView/StatusIcon.tsx
index b42c12ff8..c69bb7259 100644
--- a/cognee-frontend/src/modules/ingestion/DatasetsView/StatusIcon.tsx
+++ b/cognee-frontend/src/modules/ingestion/DatasetsView/StatusIcon.tsx
@@ -1,5 +1,5 @@
-export default function StatusIcon({ status }: { status: 'DATASET_PROCESSING_FINISHED' | string }) {
- const isSuccess = status === 'DATASET_PROCESSING_FINISHED';
+export default function StatusIcon({ status }: { status: 'DATASET_PROCESSING_COMPLETED' | string }) {
+ const isSuccess = status === 'DATASET_PROCESSING_COMPLETED';
return (
own_document_ids = await get_document_ids_for_user(user.id)
search_params = SearchParameters(search_type = search_type, params = params)
- search_results = await specific_search([search_params])
+ search_results = await specific_search([search_params], user)
from uuid import UUID
@@ -67,7 +67,7 @@ async def search(search_type: str, params: Dict[str, Any], user: User = None) ->
return filtered_search_results
-async def specific_search(query_params: List[SearchParameters]) -> List:
+async def specific_search(query_params: List[SearchParameters], user) -> List:
search_functions: Dict[SearchType, Callable] = {
SearchType.ADJACENT: search_adjacent,
SearchType.SUMMARY: search_summary,
@@ -78,6 +78,8 @@ async def specific_search(query_params: List[SearchParameters]) -> List:
search_tasks = []
+ send_telemetry("cognee.search EXECUTION STARTED", user.id)
+
for search_param in query_params:
search_func = search_functions.get(search_param.search_type)
if search_func:
@@ -88,6 +90,6 @@ async def specific_search(query_params: List[SearchParameters]) -> List:
# Use asyncio.gather to run all scheduled tasks concurrently
search_results = await asyncio.gather(*search_tasks)
- send_telemetry("cognee.search")
+ send_telemetry("cognee.search EXECUTION COMPLETED", user.id)
return search_results[0] if len(search_results) == 1 else search_results
diff --git a/cognee/modules/pipelines/operations/run_tasks.py b/cognee/modules/pipelines/operations/run_tasks.py
index 99a6162d5..62a0c346a 100644
--- a/cognee/modules/pipelines/operations/run_tasks.py
+++ b/cognee/modules/pipelines/operations/run_tasks.py
@@ -1,10 +1,13 @@
import inspect
import logging
+from cognee.shared.utils import send_telemetry
+from cognee.modules.users.models import User
+from cognee.modules.users.methods import get_default_user
from ..tasks.Task import Task
logger = logging.getLogger("run_tasks(tasks: [Task], data)")
-async def run_tasks(tasks: [Task], data = None):
+async def run_tasks_base(tasks: [Task], data = None, user: User = None):
if len(tasks) == 0:
yield data
return
@@ -17,7 +20,10 @@ async def run_tasks(tasks: [Task], data = None):
next_task_batch_size = next_task.task_config["batch_size"] if next_task else 1
if inspect.isasyncgenfunction(running_task.executable):
- logger.info("Running async generator task: `%s`", running_task.executable.__name__)
+ logger.info("Async generator task started: `%s`", running_task.executable.__name__)
+ send_telemetry("Async Generator Task Started", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
try:
results = []
@@ -27,29 +33,42 @@ async def run_tasks(tasks: [Task], data = None):
results.append(partial_result)
if len(results) == next_task_batch_size:
- async for result in run_tasks(leftover_tasks, results[0] if next_task_batch_size == 1 else results):
+ async for result in run_tasks_base(
+ leftover_tasks,
+ results[0] if next_task_batch_size == 1 else results,
+ user = user,
+ ):
yield result
results = []
if len(results) > 0:
- async for result in run_tasks(leftover_tasks, results):
+ async for result in run_tasks_base(leftover_tasks, results, user):
yield result
results = []
- logger.info("Finished async generator task: `%s`", running_task.executable.__name__)
+ logger.info("Async generator task completed: `%s`", running_task.executable.__name__)
+ send_telemetry("Async Generator Task Completed", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
except Exception as error:
logger.error(
- "Error occurred while running async generator task: `%s`\n%s\n",
+ "Async generator task errored: `%s`\n%s\n",
running_task.executable.__name__,
str(error),
exc_info = True,
)
+ send_telemetry("Async Generator Task Errored", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
raise error
elif inspect.isgeneratorfunction(running_task.executable):
- logger.info("Running generator task: `%s`", running_task.executable.__name__)
+ logger.info("Generator task started: `%s`", running_task.executable.__name__)
+ send_telemetry("Generator Task Started", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
try:
results = []
@@ -57,59 +76,112 @@ async def run_tasks(tasks: [Task], data = None):
results.append(partial_result)
if len(results) == next_task_batch_size:
- async for result in run_tasks(leftover_tasks, results[0] if next_task_batch_size == 1 else results):
+ async for result in run_tasks_base(leftover_tasks, results[0] if next_task_batch_size == 1 else results, user):
yield result
results = []
if len(results) > 0:
- async for result in run_tasks(leftover_tasks, results):
+ async for result in run_tasks_base(leftover_tasks, results, user):
yield result
results = []
- logger.info("Finished generator task: `%s`", running_task.executable.__name__)
+ logger.info("Generator task completed: `%s`", running_task.executable.__name__)
+ send_telemetry("Generator Task Completed", user_id = user.id, additional_properties = {
+ "task_name": running_task.executable.__name__,
+ })
except Exception as error:
logger.error(
- "Error occurred while running generator task: `%s`\n%s\n",
+ "Generator task errored: `%s`\n%s\n",
running_task.executable.__name__,
str(error),
exc_info = True,
)
+ send_telemetry("Generator Task Errored", user_id = user.id, additional_properties = {
+ "task_name": running_task.executable.__name__,
+ })
raise error
elif inspect.iscoroutinefunction(running_task.executable):
- logger.info("Running coroutine task: `%s`", running_task.executable.__name__)
+ logger.info("Coroutine task started: `%s`", running_task.executable.__name__)
+ send_telemetry("Coroutine Task Started", user_id = user.id, additional_properties = {
+ "task_name": running_task.executable.__name__,
+ })
try:
task_result = await running_task.run(*args)
- async for result in run_tasks(leftover_tasks, task_result):
+ async for result in run_tasks_base(leftover_tasks, task_result, user):
yield result
- logger.info("Finished coroutine task: `%s`", running_task.executable.__name__)
+ logger.info("Coroutine task completed: `%s`", running_task.executable.__name__)
+ send_telemetry("Coroutine Task Completed", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
except Exception as error:
logger.error(
- "Error occurred while running coroutine task: `%s`\n%s\n",
+ "Coroutine task errored: `%s`\n%s\n",
running_task.executable.__name__,
str(error),
exc_info = True,
)
+ send_telemetry("Coroutine Task Errored", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
raise error
elif inspect.isfunction(running_task.executable):
- logger.info("Running function task: `%s`", running_task.executable.__name__)
+ logger.info("Function task started: `%s`", running_task.executable.__name__)
+ send_telemetry("Function Task Started", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
try:
task_result = running_task.run(*args)
- async for result in run_tasks(leftover_tasks, task_result):
+ async for result in run_tasks_base(leftover_tasks, task_result, user):
yield result
- logger.info("Finished function task: `%s`", running_task.executable.__name__)
+ logger.info("Function task completed: `%s`", running_task.executable.__name__)
+ send_telemetry("Function Task Completed", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
except Exception as error:
logger.error(
- "Error occurred while running function task: `%s`\n%s\n",
+ "Function task errored: `%s`\n%s\n",
running_task.executable.__name__,
str(error),
exc_info = True,
)
+ send_telemetry("Function Task Errored", user.id, {
+ "task_name": running_task.executable.__name__,
+ })
raise error
+
+async def run_tasks(tasks: [Task], data = None, pipeline_name: str = "default_pipeline"):
+ user = await get_default_user()
+
+ try:
+ logger.info("Pipeline run started: `%s`", pipeline_name)
+ send_telemetry("Pipeline Run Started", user.id, {
+ "pipeline_name": pipeline_name,
+ })
+
+ async for result in run_tasks_base(tasks, data, user):
+ yield result
+
+ logger.info("Pipeline run completed: `%s`", pipeline_name)
+ send_telemetry("Pipeline Run Completed", user.id, {
+ "pipeline_name": pipeline_name,
+ })
+ except Exception as error:
+ logger.error(
+ "Pipeline run errored: `%s`\n%s\n",
+ pipeline_name,
+ str(error),
+ exc_info = True,
+ )
+ send_telemetry("Pipeline Run Errored", user.id, {
+ "pipeline_name": pipeline_name,
+ })
+
+ raise error
diff --git a/cognee/shared/utils.py b/cognee/shared/utils.py
index 76a395504..f6b75f4e0 100644
--- a/cognee/shared/utils.py
+++ b/cognee/shared/utils.py
@@ -1,7 +1,5 @@
""" This module contains utility functions for the cognee. """
-import logging
import os
-import uuid
import datetime
import graphistry
import networkx as nx
@@ -14,25 +12,23 @@ from posthog import Posthog
from cognee.base_config import get_base_config
from cognee.infrastructure.databases.graph import get_graph_engine
-def send_telemetry(event_name: str):
+def send_telemetry(event_name: str, user_id, additional_properties: dict = {}):
if os.getenv("TELEMETRY_DISABLED"):
- print("Telemetry is disabled.")
- logging.info("Telemetry is disabled.")
return
env = os.getenv("ENV")
- if env in ["local", "test", "dev"]:
+ if env in ["test", "dev"]:
return
posthog = Posthog(
- project_api_key = "phc_bbR86N876kwub62Lr3dhQ7zIeRyMMMm0fxXqxPqzLm3",
- host="https://eu.i.posthog.com"
+ project_api_key = "phc_UB1YVere1KtJg1MFxAo6ABfpkwN3OxCvGNDkMTjvH0",
+ host = "https://eu.i.posthog.com"
)
- user_id = str(uuid.uuid4())
current_time = datetime.datetime.now()
properties = {
- "time": current_time.strftime("%m/%d/%Y")
+ "time": current_time.strftime("%m/%d/%Y"),
+ **additional_properties,
}
try:
diff --git a/entrypoint.sh b/entrypoint.sh
index ed8b4780a..e1f9c75b7 100755
--- a/entrypoint.sh
+++ b/entrypoint.sh
@@ -5,7 +5,7 @@ echo "Environment: $ENVIRONMENT"
echo "Starting Gunicorn"
-if [ "$ENVIRONMENT" = "local" ]; then
+if [ "$ENVIRONMENT" = "dev" ]; then
if [ "$DEBUG" = true ]; then
echo "Waiting for the debugger to attach..."