cognee/examples/guides/custom_tasks_and_pipelines.py
2026-01-15 16:20:48 +01:00

53 lines
1.6 KiB
Python

import asyncio
from typing import Any, Dict, List
from pydantic import BaseModel, SkipValidation
import cognee
from cognee.modules.engine.operations.setup import setup
from cognee.infrastructure.llm.LLMGateway import LLMGateway
from cognee.infrastructure.engine import DataPoint
from cognee.tasks.storage import add_data_points
from cognee.modules.pipelines import Task, run_pipeline
class Person(DataPoint):
name: str
# Optional relationships (we'll let the LLM populate this)
knows: List["Person"] = []
# Make names searchable in the vector store
metadata: Dict[str, Any] = {"index_fields": ["name"]}
class People(BaseModel):
persons: List[Person]
async def extract_people(text: str) -> List[Person]:
system_prompt = (
"Extract people mentioned in the text. "
"Return as `persons: Person[]` with each Person having `name` and optional `knows` relations. "
"If the text says someone knows someone set `knows` accordingly. "
"Only include facts explicitly stated."
)
people = await LLMGateway.acreate_structured_output(text, system_prompt, People)
return people.persons
async def main():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
await setup()
text = "Alice knows Bob."
tasks = [
Task(extract_people), # input: text -> output: list[Person]
Task(add_data_points), # input: list[Person] -> output: list[Person]
]
async for _ in run_pipeline(tasks=tasks, data=text, datasets=["people_demo"]):
pass
if __name__ == "__main__":
asyncio.run(main())