cognee/cognee/memify_pipelines/persist_sessions_in_knowledge_graph.py
hajdul88 c0e5ce04ce
Fix: fixes session history test for multiuser mode (#1746)
<!-- .github/pull_request_template.md -->

## Description
Fixes failing session history test

## Type of Change
<!-- Please check the relevant option -->
- [x] Bug fix (non-breaking change that fixes an issue)
- [ ] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [x] My code follows the project's coding standards and style
guidelines
- [x] I have added tests that prove my fix is effective or that my
feature works
- [x] I have added necessary documentation (if applicable)
- [x] All new and existing tests pass
- [x] I have searched existing PRs to ensure this change hasn't been
submitted already
- [x] I have linked any relevant issues in the description
- [x] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
2025-11-06 14:13:55 +01:00

55 lines
1.7 KiB
Python

from typing import Optional, List
from cognee import memify
from cognee.context_global_variables import (
set_database_global_context_variables,
set_session_user_context_variable,
)
from cognee.exceptions import CogneeValidationError
from cognee.modules.data.methods import get_authorized_existing_datasets
from cognee.shared.logging_utils import get_logger
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.models import User
from cognee.tasks.memify import extract_user_sessions, cognify_session
logger = get_logger("persist_sessions_in_knowledge_graph")
async def persist_sessions_in_knowledge_graph_pipeline(
user: User,
session_ids: Optional[List[str]] = None,
dataset: str = "main_dataset",
run_in_background: bool = False,
):
await set_session_user_context_variable(user)
dataset_to_write = await get_authorized_existing_datasets(
user=user, datasets=[dataset], permission_type="write"
)
if not dataset_to_write:
raise CogneeValidationError(
message=f"User (id: {str(user.id)}) does not have write access to dataset: {dataset}",
log=False,
)
await set_database_global_context_variables(
dataset_to_write[0].id, dataset_to_write[0].owner_id
)
extraction_tasks = [Task(extract_user_sessions, session_ids=session_ids)]
enrichment_tasks = [
Task(cognify_session, dataset_id=dataset_to_write[0].id),
]
result = await memify(
extraction_tasks=extraction_tasks,
enrichment_tasks=enrichment_tasks,
dataset=dataset_to_write[0].id,
data=[{}],
run_in_background=run_in_background,
)
logger.info("Session persistence pipeline completed")
return result