added topology refactor

This commit is contained in:
Vasilije 2024-06-10 17:38:08 +02:00
parent 4f76c4622a
commit 9fd542c28e
2 changed files with 63 additions and 205 deletions

View file

@ -1,155 +1,4 @@
# import csv """ This module contains the TopologyEngine class which is responsible for adding graph topology from a JSON or CSV file. """
# import json
# import os
# import glob
#
# import aiofiles
# from pydantic import BaseModel, Field
# from typing import Dict, List, Optional, Union, Type, Any, Tuple
#
# from cognee import config
# from cognee.base_config import get_base_config
# from cognee.infrastructure.databases.graph import get_graph_config
# from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client
# from cognee.modules.cognify.config import get_cognify_config
# import pandas as pd
# from pydantic import BaseModel, Field
# from typing import Any, Dict, List, Optional, Union
#
#
# class RelationshipModel(BaseModel):
# type: str
# source: str
# target: str
#
#
# class NodeModel(BaseModel):
# node_id: str
# name: str
# default_relationship: Optional[RelationshipModel] = None
# children: List[Union[Dict[str, Any], "NodeModel"]] = Field(default_factory=list)
#
#
# NodeModel.update_forward_refs()
# cognify_config = get_cognify_config()
# base_config = get_base_config()
#
# class Relationship(BaseModel):
# type: str = Field(..., description="The type of relationship, e.g., 'belongs_to'.")
# source: Optional[str] = Field(None, description="The identifier of the source id of in the relationship being a directory or subdirectory")
# target: Optional[str] = Field(None, description="The identifier of the target id in the relationship being the directory, subdirectory or file")
# properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties and values related to the relationship.")
#
#
# class Document(BaseModel):
# node_id: str
# title: str
# description: Optional[str] = None
# default_relationship: Relationship
#
#
# class DirectoryModel(BaseModel):
# node_id: str
# path: str
# summary: str
# documents: List[Document] = []
# subdirectories: List['DirectoryModel'] = []
# default_relationship: Relationship
#
#
# DirectoryModel.update_forward_refs()
#
#
# class DirMetadata(BaseModel):
# node_id: str
# summary: str
# owner: str
# description: Optional[str] = None
# directories: List[DirectoryModel] = []
# documents: List[Document] = []
# default_relationship: Relationship
#
#
# class GitHubRepositoryModel(BaseModel):
# node_id: str
# metadata: DirMetadata
# root_directory: DirectoryModel
#
#
# class TopologyEngine:
# def __init__(self) -> None:
# self.models: Dict[str, Type[BaseModel]] = {}
# self.infer = False
# async def flatten_model(self, model: NodeModel, parent_id: Optional[str] = None) -> Dict[str, Any]:
# result = model.dict()
# result["parent_id"] = parent_id
# if model.default_relationship:
# result.update({
# "relationship_type": model.default_relationship.type,
# "relationship_source": model.default_relationship.source,
# "relationship_target": model.default_relationship.target
# })
# return result
#
# async def recursive_flatten(self, items: Union[List[Dict[str, Any]], Dict[str, Any]], parent_id: Optional[str] = None) -> List[Dict[str, Any]]:
# flat_list = []
#
# if isinstance(items, list):
# for item in items:
# flat_list.extend(await self.recursive_flatten(item, parent_id))
# elif isinstance(items, dict):
# item = NodeModel.parse_obj(items)
# flat_list.append(await self.flatten_model(item, parent_id))
# for child in item.children:
# flat_list.extend(await self.recursive_flatten(child, item.node_id))
# return flat_list
#
# async def load_data(self, file_path: str) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
# if file_path.endswith('.json'):
# async with aiofiles.open(file_path, mode='r') as f:
# data = await f.read()
# return json.loads(data)
# elif file_path.endswith('.csv'):
# async with aiofiles.open(file_path, mode='r') as f:
# reader = csv.DictReader(await f.read().splitlines())
# return list(reader)
# else:
# raise ValueError("Unsupported file format")
#
# async def add_graph_topology(self, file_path: str):
# data = await self.load_data(file_path)
#
# flt_topology = await self.recursive_flatten(data)
# print(flt_topology)
# df = pd.DataFrame(flt_topology)
# graph_client = await get_graph_client()
#
# for _, row in df.iterrows():
# node_data = row.to_dict()
# node_id = node_data.pop("node_id", None)
# await graph_client.add_node(node_id, node_data)
# if pd.notna(row["relationship_source"]) and pd.notna(row["relationship_target"]):
# await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"])
#
# return graph_client.graph
#
#
#
#
# if __name__ == "__main__":
# async def main():
# topology_engine = TopologyEngine()
# file_path = 'example_data.json' # or 'example_data.csv'
#
# # Adding graph topology
# graph = await topology_engine.add_graph_topology(file_path)
# print(graph)
#
#
# import asyncio
# asyncio.run(main())
# # result = engine.extrapolate("GitHubRepositoryModel")
# # print(result)
import csv import csv
import json import json
@ -157,69 +6,21 @@ import aiofiles
import pandas as pd import pandas as pd
from pydantic import BaseModel, Field from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional, Union, Type from typing import Any, Dict, List, Optional, Union, Type
from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognee.modules.cognify.config import get_cognify_config from cognee.modules.cognify.config import get_cognify_config
from cognee.base_config import get_base_config from cognee.base_config import get_base_config
from cognee.modules.topology.topology_data_models import NodeModel, RelationshipModel, Document, DirectoryModel, DirMetadata, GitHubRepositoryModel
# Define models import asyncio
class RelationshipModel(BaseModel):
type: str
source: str
target: str
class NodeModel(BaseModel):
node_id: str
name: str
default_relationship: Optional[RelationshipModel] = None
children: List[Union[Dict[str, Any], "NodeModel"]] = Field(default_factory=list)
NodeModel.update_forward_refs()
cognify_config = get_cognify_config() cognify_config = get_cognify_config()
base_config = get_base_config() base_config = get_base_config()
class Relationship(BaseModel):
type: str = Field(..., description="The type of relationship, e.g., 'belongs_to'.")
source: Optional[str] = Field(None, description="The identifier of the source id in the relationship.")
target: Optional[str] = Field(None, description="The identifier of the target id in the relationship.")
properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties related to the relationship.")
class Document(BaseModel):
node_id: str
title: str
description: Optional[str] = None
default_relationship: Relationship
class DirectoryModel(BaseModel):
node_id: str
path: str
summary: str
documents: List[Document] = []
subdirectories: List['DirectoryModel'] = []
default_relationship: Relationship
DirectoryModel.update_forward_refs()
class DirMetadata(BaseModel):
node_id: str
summary: str
owner: str
description: Optional[str] = None
directories: List[DirectoryModel] = []
documents: List[Document] = []
default_relationship: Relationship
class GitHubRepositoryModel(BaseModel):
node_id: str
metadata: DirMetadata
root_directory: DirectoryModel
class TopologyEngine: class TopologyEngine:
def __init__(self) -> None: def __init__(self) -> None:
self.models: Dict[str, Type[BaseModel]] = {} self.models: Dict[str, Type[BaseModel]] = {}
self.infer = False self.infer = False
async def flatten_model(self, model: NodeModel, parent_id: Optional[str] = None) -> Dict[str, Any]: async def flatten_model(self, model: NodeModel, parent_id: Optional[str] = None) -> Dict[str, Any]:
"""Flatten the model to a dictionary."""
result = model.dict() result = model.dict()
result["parent_id"] = parent_id result["parent_id"] = parent_id
if model.default_relationship: if model.default_relationship:
@ -231,6 +32,7 @@ class TopologyEngine:
return result return result
async def recursive_flatten(self, items: Union[List[Dict[str, Any]], Dict[str, Any]], parent_id: Optional[str] = None) -> List[Dict[str, Any]]: async def recursive_flatten(self, items: Union[List[Dict[str, Any]], Dict[str, Any]], parent_id: Optional[str] = None) -> List[Dict[str, Any]]:
"""Recursively flatten the items. """
flat_list = [] flat_list = []
if isinstance(items, list): if isinstance(items, list):
@ -244,6 +46,7 @@ class TopologyEngine:
return flat_list return flat_list
async def load_data(self, file_path: str) -> Union[List[Dict[str, Any]], Dict[str, Any]]: async def load_data(self, file_path: str) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
"""Load data from a JSON or CSV file."""
try: try:
if file_path.endswith('.json'): if file_path.endswith('.json'):
async with aiofiles.open(file_path, mode='r') as f: async with aiofiles.open(file_path, mode='r') as f:
@ -260,6 +63,7 @@ class TopologyEngine:
raise RuntimeError(f"Failed to load data from {file_path}: {e}") raise RuntimeError(f"Failed to load data from {file_path}: {e}")
async def add_graph_topology(self, file_path: str): async def add_graph_topology(self, file_path: str):
"""Add graph topology from a JSON or CSV file."""
try: try:
data = await self.load_data(file_path) data = await self.load_data(file_path)
flt_topology = await self.recursive_flatten(data) flt_topology = await self.recursive_flatten(data)
@ -277,8 +81,7 @@ class TopologyEngine:
except Exception as e: except Exception as e:
raise RuntimeError(f"Failed to add graph topology from {file_path}: {e}") raise RuntimeError(f"Failed to add graph topology from {file_path}: {e}")
# Example Main Function:
import asyncio
async def main(): async def main():
topology_engine = TopologyEngine() topology_engine = TopologyEngine()

View file

@ -0,0 +1,55 @@
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional, Union, Type
class Relationship(BaseModel):
type: str = Field(..., description="The type of relationship, e.g., 'belongs_to'.")
source: Optional[str] = Field(None, description="The identifier of the source id in the relationship.")
target: Optional[str] = Field(None, description="The identifier of the target id in the relationship.")
properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties related to the relationship.")
class Document(BaseModel):
node_id: str
title: str
description: Optional[str] = None
default_relationship: Relationship
class DirectoryModel(BaseModel):
node_id: str
path: str
summary: str
documents: List[Document] = []
subdirectories: List['DirectoryModel'] = []
default_relationship: Relationship
DirectoryModel.update_forward_refs()
class DirMetadata(BaseModel):
node_id: str
summary: str
owner: str
description: Optional[str] = None
directories: List[DirectoryModel] = []
documents: List[Document] = []
default_relationship: Relationship
class GitHubRepositoryModel(BaseModel):
node_id: str
metadata: DirMetadata
root_directory: DirectoryModel
class RelationshipModel(BaseModel):
type: str
source: str
target: str
class NodeModel(BaseModel):
node_id: str
name: str
default_relationship: Optional[RelationshipModel] = None
children: List[Union[Dict[str, Any], "NodeModel"]] = Field(default_factory=list)
NodeModel.update_forward_refs()