Fix low level pipeline (#1203)

<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Igor Ilic 2025-08-05 17:01:48 +02:00 committed by GitHub
parent ba624660b9
commit 8d4ed35cbe
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 63 additions and 34 deletions

View file

@ -2,7 +2,7 @@ import os
import asyncio import asyncio
from uuid import UUID from uuid import UUID
from typing import Any from typing import Any, List
from functools import wraps from functools import wraps
from sqlalchemy import select from sqlalchemy import select
@ -60,9 +60,9 @@ def override_run_tasks(new_gen):
@override_run_tasks(run_tasks_distributed) @override_run_tasks(run_tasks_distributed)
async def run_tasks( async def run_tasks(
tasks: list[Task], tasks: List[Task],
dataset_id: UUID, dataset_id: UUID,
data: Any = None, data: List[Any] = None,
user: User = None, user: User = None,
pipeline_name: str = "unknown_pipeline", pipeline_name: str = "unknown_pipeline",
context: dict = None, context: dict = None,

View file

@ -45,7 +45,7 @@ async def index_data_points(data_points: list[DataPoint]):
index_name = index_name_and_field[:first_occurence] index_name = index_name_and_field[:first_occurence]
field_name = index_name_and_field[first_occurence + 1 :] field_name = index_name_and_field[first_occurence + 1 :]
try: try:
# In case the ammount if indexable points is too large we need to send them in batches # In case the amount of indexable points is too large we need to send them in batches
batch_size = 100 batch_size = 100
for i in range(0, len(indexable_points), batch_size): for i in range(0, len(indexable_points), batch_size):
batch = indexable_points[i : i + batch_size] batch = indexable_points[i : i + batch_size]

View file

@ -1,69 +1,77 @@
import os import os
import json import json
import asyncio import asyncio
from typing import List, Any
from cognee import prune from cognee import prune
from cognee import visualize_graph from cognee import visualize_graph
from cognee.low_level import setup, DataPoint from cognee.low_level import setup, DataPoint
from cognee.modules.data.methods import load_or_create_datasets
from cognee.modules.users.methods import get_default_user
from cognee.pipelines import run_tasks, Task from cognee.pipelines import run_tasks, Task
from cognee.tasks.storage import add_data_points from cognee.tasks.storage import add_data_points
class Person(DataPoint): class Person(DataPoint):
name: str name: str
# Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
metadata: dict = {"index_fields": ["name"]}
class Department(DataPoint): class Department(DataPoint):
name: str name: str
employees: list[Person] employees: list[Person]
# Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
metadata: dict = {"index_fields": ["name"]}
class CompanyType(DataPoint): class CompanyType(DataPoint):
name: str = "Company" name: str = "Company"
# Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
metadata: dict = {"index_fields": ["name"]}
class Company(DataPoint): class Company(DataPoint):
name: str name: str
departments: list[Department] departments: list[Department]
is_type: CompanyType is_type: CompanyType
# Metadata "index_fields" specifies which DataPoint fields should be embedded for vector search
metadata: dict = {"index_fields": ["name"]}
def ingest_files(): def ingest_files(data: List[Any]):
companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json")
companies = json.loads(open(companies_file_path, "r").read())
people_file_path = os.path.join(os.path.dirname(__file__), "people.json")
people = json.loads(open(people_file_path, "r").read())
people_data_points = {} people_data_points = {}
departments_data_points = {} departments_data_points = {}
for person in people:
new_person = Person(name=person["name"])
people_data_points[person["name"]] = new_person
if person["department"] not in departments_data_points:
departments_data_points[person["department"]] = Department(
name=person["department"], employees=[new_person]
)
else:
departments_data_points[person["department"]].employees.append(new_person)
companies_data_points = {} companies_data_points = {}
# Create a single CompanyType node, so we connect all companies to it. for data_item in data:
companyType = CompanyType() people = data_item["people"]
companies = data_item["companies"]
for company in companies: for person in people:
new_company = Company(name=company["name"], departments=[], is_type=companyType) new_person = Person(name=person["name"])
companies_data_points[company["name"]] = new_company people_data_points[person["name"]] = new_person
for department_name in company["departments"]: if person["department"] not in departments_data_points:
if department_name not in departments_data_points: departments_data_points[person["department"]] = Department(
departments_data_points[department_name] = Department( name=person["department"], employees=[new_person]
name=department_name, employees=[]
) )
else:
departments_data_points[person["department"]].employees.append(new_person)
new_company.departments.append(departments_data_points[department_name]) # Create a single CompanyType node, so we connect all companies to it.
companyType = CompanyType()
for company in companies:
new_company = Company(name=company["name"], departments=[], is_type=companyType)
companies_data_points[company["name"]] = new_company
for department_name in company["departments"]:
if department_name not in departments_data_points:
departments_data_points[department_name] = Department(
name=department_name, employees=[]
)
new_company.departments.append(departments_data_points[department_name])
return companies_data_points.values() return companies_data_points.values()
@ -72,9 +80,30 @@ async def main():
await prune.prune_data() await prune.prune_data()
await prune.prune_system(metadata=True) await prune.prune_system(metadata=True)
# Create relational database tables
await setup() await setup()
pipeline = run_tasks([Task(ingest_files), Task(add_data_points)]) # If no user is provided use default user
user = await get_default_user()
# Create dataset object to keep track of pipeline status
datasets = await load_or_create_datasets(["test_dataset"], [], user)
# Prepare data for pipeline
companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json")
companies = json.loads(open(companies_file_path, "r").read())
people_file_path = os.path.join(os.path.dirname(__file__), "people.json")
people = json.loads(open(people_file_path, "r").read())
# Run tasks expects a list of data even if it is just one document
data = [{"companies": companies, "people": people}]
pipeline = run_tasks(
[Task(ingest_files), Task(add_data_points)],
dataset_id=datasets[0].id,
data=data,
incremental_loading=False,
)
async for status in pipeline: async for status in pipeline:
print(status) print(status)