<!-- .github/pull_request_template.md --> ## Description Adds new tenant id (if available) to telemetry packages for basic cognee operations ## Type of Change <!-- Please check the relevant option --> - [ ] Bug fix (non-breaking change that fixes an issue) - [ ] New feature (non-breaking change that adds functionality) - [ ] Breaking change (fix or feature that would cause existing functionality to change) - [ ] Documentation update - [x] Code refactoring - [ ] Performance improvement - [ ] Other (please specify): ## Screenshots/Videos (if applicable) <!-- Add screenshots or videos to help explain your changes --> ## Pre-submission Checklist <!-- Please check all boxes that apply before submitting your PR --> - [x] **I have tested my changes thoroughly before submitting this PR** - [x] **This PR contains minimal changes necessary to address the issue/feature** - [x] My code follows the project's coding standards and style guidelines - [x] I have added tests that prove my fix is effective or that my feature works - [x] I have added necessary documentation (if applicable) - [x] All new and existing tests pass - [x] I have searched existing PRs to ensure this change hasn't been submitted already - [x] I have linked any relevant issues in the description - [x] My commits have clear and descriptive messages ## DCO Affirmation I affirm that all code in every commit of this pull request conforms to the terms of the Topoteretes Developer Certificate of Origin.
68 lines
2.1 KiB
Python
68 lines
2.1 KiB
Python
import json
|
|
|
|
from cognee.modules.settings import get_current_settings
|
|
from cognee.modules.users.models import User
|
|
from cognee.shared.logging_utils import get_logger
|
|
from cognee.shared.utils import send_telemetry
|
|
from cognee import __version__ as cognee_version
|
|
|
|
from .run_tasks_base import run_tasks_base
|
|
from ..tasks.task import Task
|
|
|
|
|
|
logger = get_logger("run_tasks_with_telemetry()")
|
|
|
|
|
|
async def run_tasks_with_telemetry(
|
|
tasks: list[Task], data, user: User, pipeline_name: str, context: dict = None
|
|
):
|
|
config = get_current_settings()
|
|
|
|
logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent=1))
|
|
|
|
try:
|
|
logger.info("Pipeline run started: `%s`", pipeline_name)
|
|
send_telemetry(
|
|
"Pipeline Run Started",
|
|
user.id,
|
|
additional_properties={
|
|
"pipeline_name": str(pipeline_name),
|
|
"cognee_version": cognee_version,
|
|
"tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant",
|
|
}
|
|
| config,
|
|
)
|
|
|
|
async for result in run_tasks_base(tasks, data, user, context):
|
|
yield result
|
|
|
|
logger.info("Pipeline run completed: `%s`", pipeline_name)
|
|
send_telemetry(
|
|
"Pipeline Run Completed",
|
|
user.id,
|
|
additional_properties={
|
|
"pipeline_name": str(pipeline_name),
|
|
"cognee_version": cognee_version,
|
|
"tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant",
|
|
}
|
|
| config,
|
|
)
|
|
except Exception as error:
|
|
logger.error(
|
|
"Pipeline run errored: `%s`\n%s\n",
|
|
pipeline_name,
|
|
str(error),
|
|
exc_info=True,
|
|
)
|
|
send_telemetry(
|
|
"Pipeline Run Errored",
|
|
user.id,
|
|
additional_properties={
|
|
"pipeline_name": str(pipeline_name),
|
|
"cognee_version": cognee_version,
|
|
"tenant_id": str(user.tenant_id) if user.tenant_id else "Single User Tenant",
|
|
}
|
|
| config,
|
|
)
|
|
|
|
raise error
|