cognee/cognee/tests/low_level/pipeline.py
Boris f75e35c337
fix: custom model pipeline (#508)
<!-- .github/pull_request_template.md -->

## Description
<!-- Provide a clear description of the changes in this PR -->

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin


<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->
## Summary by CodeRabbit


- **New Features**
• Graph visualizations now allow exporting to a user-specified file path
for more flexible output management.
• The text embedding process has been enhanced with an additional
tokenizer option for improved performance.
• A new `ExtendableDataPoint` class has been introduced for future
extensions.
• New JSON files for companies and individuals have been added to
facilitate testing and data processing.

- **Improvements**
• Search functionality now uses updated identifiers for more reliable
content retrieval.
• Metadata handling has been streamlined across various classes by
removing unnecessary type specifications.
• Enhanced serialization of properties in the Neo4j adapter for improved
handling of complex structures.
• The setup process for databases has been improved with a new
asynchronous setup function.

- **Chores**
• Dependency and configuration updates improve overall stability and
performance.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-02-08 02:00:15 +01:00

94 lines
2.6 KiB
Python

import os
import json
import asyncio
from cognee import prune
from cognee import visualize_graph
from cognee.low_level import setup, DataPoint
from cognee.pipelines import run_tasks, Task
from cognee.tasks.storage import add_data_points
from cognee.shared.utils import render_graph
class Person(DataPoint):
name: str
class Department(DataPoint):
name: str
employees: list[Person]
class CompanyType(DataPoint):
name: str = "Company"
class Company(DataPoint):
name: str
departments: list[Department]
is_type: CompanyType
def ingest_files():
companies_file_path = os.path.join(os.path.dirname(__file__), "companies.json")
companies = json.loads(open(companies_file_path, "r").read())
people_file_path = os.path.join(os.path.dirname(__file__), "people.json")
people = json.loads(open(people_file_path, "r").read())
people_data_points = {}
departments_data_points = {}
for person in people:
new_person = Person(name=person["name"])
people_data_points[person["name"]] = new_person
if person["department"] not in departments_data_points:
departments_data_points[person["department"]] = Department(
name=person["department"], employees=[new_person]
)
else:
departments_data_points[person["department"]].employees.append(new_person)
companies_data_points = {}
# Create a single CompanyType node, so we connect all companies to it.
companyType = CompanyType()
for company in companies:
new_company = Company(name=company["name"], departments=[], is_type=companyType)
companies_data_points[company["name"]] = new_company
for department_name in company["departments"]:
if department_name not in departments_data_points:
departments_data_points[department_name] = Department(
name=department_name, employees=[]
)
new_company.departments.append(departments_data_points[department_name])
return companies_data_points.values()
async def main():
await prune.prune_data()
await prune.prune_system(metadata=True)
await setup()
pipeline = run_tasks([Task(ingest_files), Task(add_data_points)])
async for status in pipeline:
print(status)
# Get a graphistry url (Register for a free account at https://www.graphistry.com)
await render_graph()
# Or use our simple graph preview
graph_file_path = str(
os.path.join(os.path.dirname(__file__), ".artifacts/graph_visualization.html")
)
await visualize_graph(graph_file_path)
if __name__ == "__main__":
asyncio.run(main())