feat: pass context argument to tasks that require it (#788)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.

---------

Co-authored-by: Hande <159312713+hande-k@users.noreply.github.com>
Co-authored-by: Vasilije <8619304+Vasilije1990@users.noreply.github.com>
This commit is contained in:
Boris 2025-04-30 12:32:40 +02:00 committed by GitHub
parent 7db74222ee
commit 5970d964cf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 247 additions and 22 deletions

View file

@ -32,6 +32,14 @@ Build dynamic Agent memory using scalable, modular ECL (Extract, Cognify, Load)
More on [use-cases](https://docs.cognee.ai/use-cases) and [evals](https://github.com/topoteretes/cognee/tree/main/evals) More on [use-cases](https://docs.cognee.ai/use-cases) and [evals](https://github.com/topoteretes/cognee/tree/main/evals)
<p align="center">
🌐 Available Languages
:
<a href="community/README.pt.md">🇵🇹 Português</a>
·
<a href="community/README.zh.md">🇨🇳 [中文]</a>
</p>
<div style="text-align: center"> <div style="text-align: center">
<img src="https://raw.githubusercontent.com/topoteretes/cognee/refs/heads/main/assets/cognee_benefits.png" alt="Why cognee?" width="50%" /> <img src="https://raw.githubusercontent.com/topoteretes/cognee/refs/heads/main/assets/cognee_benefits.png" alt="Why cognee?" width="50%" />
</div> </div>
@ -116,17 +124,15 @@ Example output:
Natural Language Processing (NLP) is a cross-disciplinary and interdisciplinary field that involves computer science and information retrieval. It focuses on the interaction between computers and human language, enabling machines to understand and process natural language. Natural Language Processing (NLP) is a cross-disciplinary and interdisciplinary field that involves computer science and information retrieval. It focuses on the interaction between computers and human language, enabling machines to understand and process natural language.
``` ```
### cognee UI
### cognee UI
You can also cognify your files and query using cognee UI. You can also cognify your files and query using cognee UI.
<img src="assets/cognee-ui-2.webp" width="100%" alt="Cognee UI 2"></a> <img src="assets/cognee-ui-2.webp" width="100%" alt="Cognee UI 2"></a>
Try cognee UI out locally [here](https://docs.cognee.ai/how-to-guides/cognee-ui). Try cognee UI out locally [here](https://docs.cognee.ai/how-to-guides/cognee-ui).
## Understand our architecture ## Understand our architecture
<div style="text-align: center"> <div style="text-align: center">

View file

@ -20,7 +20,9 @@ from ..tasks.task import Task
logger = get_logger("run_tasks(tasks: [Task], data)") logger = get_logger("run_tasks(tasks: [Task], data)")
async def run_tasks_with_telemetry(tasks: list[Task], data, user: User, pipeline_name: str): async def run_tasks_with_telemetry(
tasks: list[Task], data, user: User, pipeline_name: str, context: dict = None
):
config = get_current_settings() config = get_current_settings()
logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent=1)) logger.debug("\nRunning pipeline with configuration:\n%s\n", json.dumps(config, indent=1))
@ -36,7 +38,7 @@ async def run_tasks_with_telemetry(tasks: list[Task], data, user: User, pipeline
| config, | config,
) )
async for result in run_tasks_base(tasks, data, user): async for result in run_tasks_base(tasks, data, user, context):
yield result yield result
logger.info("Pipeline run completed: `%s`", pipeline_name) logger.info("Pipeline run completed: `%s`", pipeline_name)
@ -72,6 +74,7 @@ async def run_tasks(
data: Any = None, data: Any = None,
user: User = None, user: User = None,
pipeline_name: str = "unknown_pipeline", pipeline_name: str = "unknown_pipeline",
context: dict = None,
): ):
pipeline_id = uuid5(NAMESPACE_OID, pipeline_name) pipeline_id = uuid5(NAMESPACE_OID, pipeline_name)
@ -82,7 +85,11 @@ async def run_tasks(
try: try:
async for _ in run_tasks_with_telemetry( async for _ in run_tasks_with_telemetry(
tasks=tasks, data=data, user=user, pipeline_name=pipeline_id tasks=tasks,
data=data,
user=user,
pipeline_name=pipeline_id,
context=context,
): ):
pass pass

View file

@ -14,6 +14,7 @@ async def handle_task(
leftover_tasks: list[Task], leftover_tasks: list[Task],
next_task_batch_size: int, next_task_batch_size: int,
user: User, user: User,
context: dict = None,
): ):
"""Handle common task workflow with logging, telemetry, and error handling around the core execution logic.""" """Handle common task workflow with logging, telemetry, and error handling around the core execution logic."""
task_type = running_task.task_type task_type = running_task.task_type
@ -27,9 +28,16 @@ async def handle_task(
}, },
) )
has_context = any(
[key == "context" for key in inspect.signature(running_task.executable).parameters.keys()]
)
if has_context:
args.append(context)
try: try:
async for result_data in running_task.execute(args, next_task_batch_size): async for result_data in running_task.execute(args, next_task_batch_size):
async for result in run_tasks_base(leftover_tasks, result_data, user): async for result in run_tasks_base(leftover_tasks, result_data, user, context):
yield result yield result
logger.info(f"{task_type} task completed: `{running_task.executable.__name__}`") logger.info(f"{task_type} task completed: `{running_task.executable.__name__}`")
@ -55,7 +63,7 @@ async def handle_task(
raise error raise error
async def run_tasks_base(tasks: list[Task], data=None, user: User = None): async def run_tasks_base(tasks: list[Task], data=None, user: User = None, context: dict = None):
"""Base function to execute tasks in a pipeline, handling task type detection and execution.""" """Base function to execute tasks in a pipeline, handling task type detection and execution."""
if len(tasks) == 0: if len(tasks) == 0:
yield data yield data
@ -68,5 +76,7 @@ async def run_tasks_base(tasks: list[Task], data=None, user: User = None):
next_task = leftover_tasks[0] if len(leftover_tasks) > 0 else None next_task = leftover_tasks[0] if len(leftover_tasks) > 0 else None
next_task_batch_size = next_task.task_config["batch_size"] if next_task else 1 next_task_batch_size = next_task.task_config["batch_size"] if next_task else 1
async for result in handle_task(running_task, args, leftover_tasks, next_task_batch_size, user): async for result in handle_task(
running_task, args, leftover_tasks, next_task_batch_size, user, context
):
yield result yield result

View file

@ -312,7 +312,7 @@ def setup_logging(log_level=None, name=None):
root_logger.addHandler(file_handler) root_logger.addHandler(file_handler)
root_logger.setLevel(log_level) root_logger.setLevel(log_level)
if log_level > logging.WARNING: if log_level > logging.DEBUG:
import warnings import warnings
from sqlalchemy.exc import SAWarning from sqlalchemy.exc import SAWarning

View file

@ -1,11 +0,0 @@
import os
import pytest
@pytest.fixture(autouse=True, scope="session")
def copy_cognee_db_to_target_location():
os.makedirs("cognee/.cognee_system/databases/", exist_ok=True)
os.system(
"cp cognee/tests/integration/run_toy_tasks/data/cognee_db cognee/.cognee_system/databases/cognee_db"
)

View file

@ -48,3 +48,7 @@ async def run_and_check_tasks():
def test_run_tasks(): def test_run_tasks():
asyncio.run(run_and_check_tasks()) asyncio.run(run_and_check_tasks())
if __name__ == "__main__":
test_run_tasks()

View file

@ -0,0 +1,47 @@
import asyncio
import cognee
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.methods import get_default_user
from cognee.modules.pipelines.operations.run_tasks import run_tasks_base
from cognee.infrastructure.databases.relational import create_db_and_tables
async def run_and_check_tasks():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
def task_1(num, context):
return num + context
def task_2(num):
return num * 2
def task_3(num, context):
return num**context
await create_db_and_tables()
user = await get_default_user()
pipeline = run_tasks_base(
[
Task(task_1),
Task(task_2),
Task(task_3),
],
data=5,
user=user,
context=7,
)
final_result = 4586471424
async for result in pipeline:
assert result == final_result
def test_run_tasks():
asyncio.run(run_and_check_tasks())
if __name__ == "__main__":
test_run_tasks()

162
community/README.zh.md Normal file
View file

@ -0,0 +1,162 @@
<div align="center">
<a href="https://github.com/topoteretes/cognee">
<img src="https://raw.githubusercontent.com/topoteretes/cognee/refs/heads/dev/assets/cognee-logo-transparent.png" alt="Cognee Logo" height="60">
</a>
<br />
cognee - AI应用和智能体的记忆层
<p align="center">
<a href="https://www.youtube.com/watch?v=1bezuvLwJmw&t=2s">演示</a>
.
<a href="https://cognee.ai">了解更多</a>
·
<a href="https://discord.gg/NQPKmU5CCg">加入Discord</a>
</p>
[![GitHub forks](https://img.shields.io/github/forks/topoteretes/cognee.svg?style=social&label=Fork&maxAge=2592000)](https://GitHub.com/topoteretes/cognee/network/)
[![GitHub stars](https://img.shields.io/github/stars/topoteretes/cognee.svg?style=social&label=Star&maxAge=2592000)](https://GitHub.com/topoteretes/cognee/stargazers/)
[![GitHub commits](https://badgen.net/github/commits/topoteretes/cognee)](https://GitHub.com/topoteretes/cognee/commit/)
[![Github tag](https://badgen.net/github/tag/topoteretes/cognee)](https://github.com/topoteretes/cognee/tags/)
[![Downloads](https://static.pepy.tech/badge/cognee)](https://pepy.tech/project/cognee)
[![License](https://img.shields.io/github/license/topoteretes/cognee?colorA=00C586&colorB=000000)](https://github.com/topoteretes/cognee/blob/main/LICENSE)
[![Contributors](https://img.shields.io/github/contributors/topoteretes/cognee?colorA=00C586&colorB=000000)](https://github.com/topoteretes/cognee/graphs/contributors)
可靠的AI智能体响应。
使用可扩展、模块化的ECL提取、认知、加载管道构建动态智能体记忆。
更多[使用场景](https://docs.cognee.ai/use_cases)。
<div style="text-align: center">
<img src="cognee_benefits_zh.JPG" alt="为什么选择cognee" width="100%" />
</div>
</div>
## 功能特性
- 互联并检索您的历史对话、文档、图像和音频转录
- 减少幻觉、开发人员工作量和成本
- 仅使用Pydantic将数据加载到图形和向量数据库
- 从30多个数据源摄取数据时进行数据操作
## 开始使用
通过Google Colab <a href="https://colab.research.google.com/drive/1g-Qnx6l_ecHZi0IOw23rg0qC4TYvEvWZ?usp=sharing">笔记本</a><a href="https://github.com/topoteretes/cognee-starter">入门项目</a>快速上手
## 贡献
您的贡献是使这成为真正开源项目的核心。我们**非常感谢**任何贡献。更多信息请参阅[`CONTRIBUTING.md`](CONTRIBUTING.md)。
## 📦 安装
您可以使用**pip**、**poetry**、**uv**或任何其他Python包管理器安装Cognee。
### 使用pip
```bash
pip install cognee
```
## 💻 基本用法
### 设置
```
import os
os.environ["LLM_API_KEY"] = "YOUR OPENAI_API_KEY"
```
您也可以通过创建.env文件设置变量使用我们的<a href="https://github.com/topoteretes/cognee/blob/main/.env.template">模板</a>
要使用不同的LLM提供商请查看我们的<a href="https://docs.cognee.ai">文档</a>获取更多信息。
### 简单示例
此脚本将运行默认管道:
```python
import cognee
import asyncio
async def main():
# Add text to cognee
await cognee.add("自然语言处理NLP是计算机科学和信息检索的跨学科领域。")
# Generate the knowledge graph
await cognee.cognify()
# Query the knowledge graph
results = await cognee.search("告诉我关于NLP")
# Display the results
for result in results:
print(result)
if __name__ == '__main__':
asyncio.run(main())
```
示例输出:
```
自然语言处理NLP是计算机科学和信息检索的跨学科领域。它关注计算机和人类语言之间的交互使机器能够理解和处理自然语言。
```
图形可视化:
<a href="https://rawcdn.githack.com/topoteretes/cognee/refs/heads/main/assets/graph_visualization.html"><img src="https://rawcdn.githack.com/topoteretes/cognee/refs/heads/main/assets/graph_visualization.png" width="100%" alt="图形可视化"></a>
在[浏览器](https://rawcdn.githack.com/topoteretes/cognee/refs/heads/main/assets/graph_visualization.html)中打开。
有关更高级的用法,请查看我们的<a href="https://docs.cognee.ai">文档</a>
## 了解我们的架构
<div style="text-align: center">
<img src="cognee_diagram_zh.JPG" alt="cognee概念图" width="100%" />
</div>
## 演示
1. 什么是AI记忆
[了解cognee](https://github.com/user-attachments/assets/8b2a0050-5ec4-424c-b417-8269971503f0)
2. 简单GraphRAG演示
[简单GraphRAG演示](https://github.com/user-attachments/assets/f57fd9ea-1dc0-4904-86eb-de78519fdc32)
3. cognee与Ollama
[cognee与本地模型](https://github.com/user-attachments/assets/834baf9a-c371-4ecf-92dd-e144bd0eb3f6)
## 行为准则
我们致力于为我们的社区提供愉快和尊重的开源体验。有关更多信息,请参阅<a href="https://github.com/topoteretes/cognee/blob/main/CODE_OF_CONDUCT.md"><code>CODE_OF_CONDUCT</code></a>
## 💫 贡献者
<a href="https://github.com/topoteretes/cognee/graphs/contributors">
<img alt="contributors" src="https://contrib.rocks/image?repo=topoteretes/cognee"/>
</a>
## Star历史
[![Star History Chart](https://api.star-history.com/svg?repos=topoteretes/cognee&type=Date)](https://star-history.com/#topoteretes/cognee&Date)

Binary file not shown.

After

Width:  |  Height:  |  Size: 262 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 181 KiB