diff --git a/cognee-starter-kit/src/pipelines/low_level.py b/cognee-starter-kit/src/pipelines/low_level.py index 4c4c9d6da..8b4fccf33 100644 --- a/cognee-starter-kit/src/pipelines/low_level.py +++ b/cognee-starter-kit/src/pipelines/low_level.py @@ -1,14 +1,15 @@ import os -import uuid import json import asyncio import pathlib +from typing import List, Any from cognee import config, prune, search, SearchType, visualize_graph from cognee.low_level import setup, DataPoint from cognee.pipelines import run_tasks, Task from cognee.tasks.storage import add_data_points from cognee.tasks.storage.index_graph_edges import index_graph_edges from cognee.modules.users.methods import get_default_user +from cognee.modules.data.methods import load_or_create_datasets class Person(DataPoint): @@ -33,45 +34,51 @@ class Company(DataPoint): metadata: dict = {"index_fields": ["name"]} -def ingest_files(): - companies_file_path = os.path.join(os.path.dirname(__file__), "../data/companies.json") - companies = json.loads(open(companies_file_path, "r").read()) +def ingest_files(data: List[Any]): + if not data or data == [None]: + companies_file_path = os.path.join(os.path.dirname(__file__), "../data/companies.json") + companies = json.loads(open(companies_file_path, "r").read()) - people_file_path = os.path.join(os.path.dirname(__file__), "../data/people.json") - people = json.loads(open(people_file_path, "r").read()) + people_file_path = os.path.join(os.path.dirname(__file__), "../data/people.json") + people = json.loads(open(people_file_path, "r").read()) + + data = [{"companies": companies, "people": people}] people_data_points = {} departments_data_points = {} - - for person in people: - new_person = Person(name=person["name"]) - people_data_points[person["name"]] = new_person - - if person["department"] not in departments_data_points: - departments_data_points[person["department"]] = Department( - name=person["department"], employees=[new_person] - ) - else: - departments_data_points[person["department"]].employees.append(new_person) - companies_data_points = {} - # Create a single CompanyType node, so we connect all companies to it. - companyType = CompanyType() + for data_item in data: + people = data_item["people"] + companies = data_item["companies"] - for company in companies: - new_company = Company(name=company["name"], departments=[], is_type=companyType) - companies_data_points[company["name"]] = new_company + for person in people: + new_person = Person(name=person["name"]) + people_data_points[person["name"]] = new_person - for department_name in company["departments"]: - if department_name not in departments_data_points: - departments_data_points[department_name] = Department( - name=department_name, employees=[] + if person["department"] not in departments_data_points: + departments_data_points[person["department"]] = Department( + name=person["department"], employees=[new_person] ) + else: + departments_data_points[person["department"]].employees.append(new_person) - new_company.departments.append(departments_data_points[department_name]) + # Create a single CompanyType node, so we connect all companies to it. + companyType = CompanyType() - return companies_data_points.values() + for company in companies: + new_company = Company(name=company["name"], departments=[], is_type=companyType) + companies_data_points[company["name"]] = new_company + + for department_name in company["departments"]: + if department_name not in departments_data_points: + departments_data_points[department_name] = Department( + name=department_name, employees=[] + ) + + new_company.departments.append(departments_data_points[department_name]) + + return list(companies_data_points.values()) async def main(): @@ -86,16 +93,17 @@ async def main(): await setup() - # Generate a random dataset_id - dataset_id = uuid.uuid4() + # Get default user user = await get_default_user() + datasets = await load_or_create_datasets(["demo_dataset"], [], user) + pipeline = run_tasks( [ Task(ingest_files), Task(add_data_points), ], - dataset_id, + datasets[0].id, None, user, "demo_pipeline",