From 1de840dc948929b611ab13ac65223cfb2eb4ea1c Mon Sep 17 00:00:00 2001 From: prestonrasmussen Date: Tue, 13 Aug 2024 14:35:43 -0400 Subject: [PATCH] add nodes and edges --- .env.example | 4 +++ core/edges.py | 86 ++++++++++++++++++++++++++++++++++++++++++++++++ core/graphiti.py | 21 ++++++++++++ core/nodes.py | 69 ++++++++++++++++++++++++++++++++++++++ 4 files changed, 180 insertions(+) create mode 100644 .env.example create mode 100644 core/edges.py create mode 100644 core/graphiti.py create mode 100644 core/nodes.py diff --git a/.env.example b/.env.example new file mode 100644 index 00000000..b45253b0 --- /dev/null +++ b/.env.example @@ -0,0 +1,4 @@ +OPENAI_API_KEY= +NEO4J_URI= +NEO4J_USER= +NEO4J_PASSWORD= \ No newline at end of file diff --git a/core/edges.py b/core/edges.py new file mode 100644 index 00000000..686753eb --- /dev/null +++ b/core/edges.py @@ -0,0 +1,86 @@ +from abc import ABC, abstractmethod +from pydantic import BaseModel +from datetime import datetime +from neo4j import AsyncDriver +from uuid import uuid1 +import logging + +from core.nodes import Node + +logger = logging.getLogger(__name__) + + +class Edge(BaseModel, ABC): + uuid: str | None + source_node: Node + target_node: Node + transaction_from: datetime + + @abstractmethod + async def save(self, driver: AsyncDriver): + ... + + +class EpisodicEdge(Edge): + async def save(self, driver: AsyncDriver): + if self.uuid is None: + uuid = uuid1() + logger.info(f'Created uuid: {uuid} for episodic edge') + self.uuid = str(uuid) + + result = await driver.execute_query(""" + MATCH (episode:Episodic {uuid: $episode_uuid}) + MATCH (node:Semantic {uuid: $semantic_uuid}) + MERGE (episode)-[r:MENTIONS {uuid: $uuid}]->(node) + SET r = {uuid: $uuid, transaction_from: $transaction_from} + RETURN r.uuid AS uuid""", + episode_uuid=self.source_node.uuid, semantic_uuid=self.target_node.uuid, + uuid=self.uuid, transaction_from=self.transaction_from) + + logger.info(f'Saved edge to neo4j: {self.uuid}') + + return result + +# TODO: Neo4j doesn't support variables for edge types and labels. +# Right now we have all edge nodes as type RELATES_TO + + +class SemanticEdge(Edge): + name: str + fact: str + fact_embedding: list[int] = None + episodes: list[str] = None # list of episodes that reference these semantic edges + transaction_to: datetime = None # datetime of when the node was invalidated + valid_from: datetime = None # datetime of when the fact became true + valid_to: datetime = None # datetime of when the fact stopped being true + + def generate_embedding(self, embedder, model="text-embedding-3-large"): + text = self.fact.replace("\n", " ") + embedding = embedder.create(input=[text], model=model).data[0].embedding + self.fact_embedding = embedding + + return embedding + + async def save(self, driver: AsyncDriver): + if self.uuid is None: + uuid = uuid1() + logger.info(f'Created uuid: {uuid} for edge with name: {self.name}') + self.uuid = str(uuid) + + result = await driver.execute_query(""" + MATCH (source:Semantic {uuid: $source_uuid}) + MATCH (target:Semantic {uuid: $target_uuid}) + MERGE (source)-[r:RELATES_TO {uuid: $uuid}]->(target) + SET r = {uuid: $uuid, name: $name, fact: $fact, fact_embedding: $fact_embedding, + episodes: $episodes, transaction_from: $transaction_from, transaction_to: $transaction_to, + valid_from: $valid_from, valid_to: $valid_to} + RETURN r.uuid AS uuid""", + source_uuid=self.source_node.uuid, + target_uuid=self.target_node.uuid, uuid=self.uuid, name=self.name, fact=self.fact, + fact_embedding=self.fact_embedding, episodes=self.episodes, + transaction_from=self.transaction_from, transaction_to=self.transaction_to, + valid_from=self.valid_from, valid_to=self.valid_to) + + logger.info(f'Saved Node to neo4j: {self.uuid}') + + return result diff --git a/core/graphiti.py b/core/graphiti.py new file mode 100644 index 00000000..0ad2970f --- /dev/null +++ b/core/graphiti.py @@ -0,0 +1,21 @@ +import asyncio +from typing import Tuple +from datetime import datetime +import logging + +from neo4j import AsyncGraphDatabase +from openai import OpenAI + +from core.nodes import SemanticNode, EpisodicNode, Node +from core.edges import SemanticEdge, EpisodicEdge, Edge + +logger = logging.getLogger(__name__) + + +class Graphiti: + def __init__(self, uri, user, password): + self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password)) + self.database = "neo4j" + + def close(self): + self.driver.close() \ No newline at end of file diff --git a/core/nodes.py b/core/nodes.py new file mode 100644 index 00000000..3d06e8b0 --- /dev/null +++ b/core/nodes.py @@ -0,0 +1,69 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from uuid import uuid1 +from pydantic import BaseModel +from neo4j import AsyncDriver +import logging + +logger = logging.getLogger(__name__) + + +class Node(BaseModel, ABC): + uuid: str | None + name: str + labels: list[str] + transaction_from: datetime + + @abstractmethod + async def save(self, driver: AsyncDriver): + ... + + +class EpisodicNode(Node): + source: str # source type + source_description: str # description of the data source + content: str # raw episode data + semantic_edges: list[str] # list of semantic edges referenced in this episode + valid_from: datetime = None # datetime of when the original document was created + + async def save(self, driver: AsyncDriver): + if self.uuid is None: + uuid = uuid1() + logger.info(f'Created uuid: {uuid} for node with name: {self.name}') + self.uuid = str(uuid) + + result = await driver.execute_query(""" + MERGE (n:Episodic {uuid: $uuid}) + SET n = {uuid: $uuid, name: $name, source_description: $source_description, content: $content, + semantic_edges: $semantic_edges, transaction_from: $transaction_from, valid_from: $valid_from} + RETURN n.uuid AS uuid""", + uuid=self.uuid, name=self.name, source_description=self.source_description, + content=self.content, semantic_edges=self.semantic_edges, + transaction_from=self.transaction_from, valid_from=self.valid_from, _database='neo4j') + + logger.info(f'Saved Node to neo4j: {self.uuid}') + print(self.uuid) + + return result + + +class SemanticNode(Node): + summary: str # regional summary of surrounding edges + + async def save(self, driver: AsyncDriver): + if self.uuid is None: + uuid = uuid1() + logger.info(f'Created uuid: {uuid} for node with name: {self.name}') + self.uuid = str(uuid) + + result = await driver.execute_query(""" + MERGE (n:Semantic {uuid: $uuid}) + SET n = {uuid: $uuid, name: $name, summary: $summary, transaction_from: $transaction_from} + RETURN n.uuid AS uuid""", + uuid=self.uuid, name=self.name, summary=self.summary, + transaction_from=self.transaction_from) + + logger.info(f'Saved Node to neo4j: {self.uuid}') + + return result +