cognee/cognee/modules/users/methods/get_default_user.py
Igor Ilic bee2fe3ba7
feat: Add initial custom pipeline (#1716)
<!-- .github/pull_request_template.md -->

## Description
Add run_custom_pipeline to have a way to execute a custom collection of tasks in Cognee

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [ ] **I have tested my changes thoroughly before submitting this PR**
- [ ] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-11-04 17:58:34 +01:00

36 lines
1.4 KiB
Python

from types import SimpleNamespace
from sqlalchemy.orm import selectinload
from sqlalchemy.exc import NoResultFound
from sqlalchemy.future import select
from cognee.modules.users.models import User
from cognee.base_config import get_base_config
from cognee.modules.users.exceptions.exceptions import UserNotFoundError
from cognee.infrastructure.databases.exceptions import DatabaseNotCreatedError
from cognee.infrastructure.databases.relational import get_relational_engine
from cognee.modules.users.methods.create_default_user import create_default_user
async def get_default_user() -> User:
db_engine = get_relational_engine()
base_config = get_base_config()
default_email = base_config.default_user_email or "default_user@example.com"
try:
async with db_engine.get_async_session() as session:
query = (
select(User).options(selectinload(User.roles)).where(User.email == default_email)
)
result = await session.execute(query)
user = result.scalars().first()
if user is None:
return await create_default_user()
return user
except Exception as error:
if "principals" in str(error.args):
raise DatabaseNotCreatedError() from error
if isinstance(error, NoResultFound):
raise UserNotFoundError(f"Failed to retrieve default user: {default_email}") from error
raise