added topology to modules from csv, json

This commit is contained in:
Vasilije 2024-06-10 15:58:35 +02:00
parent 91864dc681
commit de972df294
5 changed files with 274 additions and 109 deletions

View file

@ -84,13 +84,13 @@ async def add_topology(directory: str = "example", model: BaseModel = GitHubRepo
df = pd.DataFrame(flt_topology)
for _, row in df.iterrows():
node_data = row.to_dict()
node_id = node_data.pop("node_id")
for _, row in df.iterrows():
node_data = row.to_dict()
node_id = node_data.pop("node_id")
# Remove "node_id" and get its value
await graph_client.add_node(node_id, node_data)
if pd.notna(row["relationship_source"]) and pd.notna(row["relationship_target"]):
await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"])
# Remove "node_id" and get its value
await graph_client.add_node(node_id, node_data)
if pd.notna(row["relationship_source"]) and pd.notna(row["relationship_target"]):
await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"])
return graph_client.graph

View file

@ -19,6 +19,7 @@ class GraphConfig(BaseSettings):
)
graph_engine: object = GraphDBType.NETWORKX
graph_model: object = KnowledgeGraph
graph_topology: object = KnowledgeGraph
model_config = SettingsConfigDict(env_file=".env", extra="allow")

View file

@ -0,0 +1,25 @@
[
{
"node_id": "1",
"name": "Node 1",
"default_relationship": {
"type": "related_to",
"source": "1",
"target": "2"
},
"children": [
{
"node_id": "2",
"name": "Node 2",
"default_relationship": {
"type": "related_to",
"source": "2",
"target": "3"
},
"children": []
}
]
}
]

View file

@ -7,12 +7,12 @@ logger = logging.getLogger(__name__)
async def infer_data_topology(content: str, graph_topology=None):
if graph_topology is None:
graph_config = get_graph_config()
graph_topology = graph_config.graph_model
graph_topology = graph_config.graph_topology
try:
return (await extract_topology(
content,
graph_topology
))
except Exception as error:
logger.error("Error extracting cognitive layers from content: %s", error, exc_info = True)
logger.error("Error extracting topology from content: %s", error, exc_info = True)
raise error

View file

@ -1,20 +1,188 @@
import os
import glob
from pydantic import BaseModel, Field
from typing import Dict, List, Optional, Union, Type, Any, Tuple
# import csv
# import json
# import os
# import glob
#
# import aiofiles
# from pydantic import BaseModel, Field
# from typing import Dict, List, Optional, Union, Type, Any, Tuple
#
# from cognee import config
# from cognee.base_config import get_base_config
# from cognee.infrastructure.databases.graph import get_graph_config
# from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client
# from cognee.modules.cognify.config import get_cognify_config
# import pandas as pd
# from pydantic import BaseModel, Field
# from typing import Any, Dict, List, Optional, Union
#
#
# class RelationshipModel(BaseModel):
# type: str
# source: str
# target: str
#
#
# class NodeModel(BaseModel):
# node_id: str
# name: str
# default_relationship: Optional[RelationshipModel] = None
# children: List[Union[Dict[str, Any], "NodeModel"]] = Field(default_factory=list)
#
#
# NodeModel.update_forward_refs()
# cognify_config = get_cognify_config()
# base_config = get_base_config()
#
# class Relationship(BaseModel):
# type: str = Field(..., description="The type of relationship, e.g., 'belongs_to'.")
# source: Optional[str] = Field(None, description="The identifier of the source id of in the relationship being a directory or subdirectory")
# target: Optional[str] = Field(None, description="The identifier of the target id in the relationship being the directory, subdirectory or file")
# properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties and values related to the relationship.")
#
#
# class Document(BaseModel):
# node_id: str
# title: str
# description: Optional[str] = None
# default_relationship: Relationship
#
#
# class DirectoryModel(BaseModel):
# node_id: str
# path: str
# summary: str
# documents: List[Document] = []
# subdirectories: List['DirectoryModel'] = []
# default_relationship: Relationship
#
#
# DirectoryModel.update_forward_refs()
#
#
# class DirMetadata(BaseModel):
# node_id: str
# summary: str
# owner: str
# description: Optional[str] = None
# directories: List[DirectoryModel] = []
# documents: List[Document] = []
# default_relationship: Relationship
#
#
# class GitHubRepositoryModel(BaseModel):
# node_id: str
# metadata: DirMetadata
# root_directory: DirectoryModel
#
#
# class TopologyEngine:
# def __init__(self) -> None:
# self.models: Dict[str, Type[BaseModel]] = {}
# self.infer = False
# async def flatten_model(self, model: NodeModel, parent_id: Optional[str] = None) -> Dict[str, Any]:
# result = model.dict()
# result["parent_id"] = parent_id
# if model.default_relationship:
# result.update({
# "relationship_type": model.default_relationship.type,
# "relationship_source": model.default_relationship.source,
# "relationship_target": model.default_relationship.target
# })
# return result
#
# async def recursive_flatten(self, items: Union[List[Dict[str, Any]], Dict[str, Any]], parent_id: Optional[str] = None) -> List[Dict[str, Any]]:
# flat_list = []
#
# if isinstance(items, list):
# for item in items:
# flat_list.extend(await self.recursive_flatten(item, parent_id))
# elif isinstance(items, dict):
# item = NodeModel.parse_obj(items)
# flat_list.append(await self.flatten_model(item, parent_id))
# for child in item.children:
# flat_list.extend(await self.recursive_flatten(child, item.node_id))
# return flat_list
#
# async def load_data(self, file_path: str) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
# if file_path.endswith('.json'):
# async with aiofiles.open(file_path, mode='r') as f:
# data = await f.read()
# return json.loads(data)
# elif file_path.endswith('.csv'):
# async with aiofiles.open(file_path, mode='r') as f:
# reader = csv.DictReader(await f.read().splitlines())
# return list(reader)
# else:
# raise ValueError("Unsupported file format")
#
# async def add_graph_topology(self, file_path: str):
# data = await self.load_data(file_path)
#
# flt_topology = await self.recursive_flatten(data)
# print(flt_topology)
# df = pd.DataFrame(flt_topology)
# graph_client = await get_graph_client()
#
# for _, row in df.iterrows():
# node_data = row.to_dict()
# node_id = node_data.pop("node_id", None)
# await graph_client.add_node(node_id, node_data)
# if pd.notna(row["relationship_source"]) and pd.notna(row["relationship_target"]):
# await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"])
#
# return graph_client.graph
#
#
#
#
# if __name__ == "__main__":
# async def main():
# topology_engine = TopologyEngine()
# file_path = 'example_data.json' # or 'example_data.csv'
#
# # Adding graph topology
# graph = await topology_engine.add_graph_topology(file_path)
# print(graph)
#
#
# import asyncio
# asyncio.run(main())
# # result = engine.extrapolate("GitHubRepositoryModel")
# # print(result)
from cognee import config
from cognee.base_config import get_base_config
import csv
import json
import aiofiles
import pandas as pd
from pydantic import BaseModel, Field
from typing import Any, Dict, List, Optional, Union, Type
from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognee.modules.cognify.config import get_cognify_config
from cognee.base_config import get_base_config
# Define models
class RelationshipModel(BaseModel):
type: str
source: str
target: str
class NodeModel(BaseModel):
node_id: str
name: str
default_relationship: Optional[RelationshipModel] = None
children: List[Union[Dict[str, Any], "NodeModel"]] = Field(default_factory=list)
NodeModel.update_forward_refs()
cognify_config = get_cognify_config()
base_config = get_base_config()
class Relationship(BaseModel):
type: str = Field(..., description="The type of relationship, e.g., 'belongs_to'.")
source: Optional[str] = Field(None, description="The identifier of the source id of in the relationship being a directory or subdirectory")
target: Optional[str] = Field(None, description="The identifier of the target id in the relationship being the directory, subdirectory or file")
properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties and values related to the relationship.")
source: Optional[str] = Field(None, description="The identifier of the source id in the relationship.")
target: Optional[str] = Field(None, description="The identifier of the target id in the relationship.")
properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties related to the relationship.")
class Document(BaseModel):
node_id: str
@ -22,7 +190,6 @@ class Document(BaseModel):
description: Optional[str] = None
default_relationship: Relationship
class DirectoryModel(BaseModel):
node_id: str
path: str
@ -31,10 +198,8 @@ class DirectoryModel(BaseModel):
subdirectories: List['DirectoryModel'] = []
default_relationship: Relationship
DirectoryModel.update_forward_refs()
class DirMetadata(BaseModel):
node_id: str
summary: str
@ -44,111 +209,85 @@ class DirMetadata(BaseModel):
documents: List[Document] = []
default_relationship: Relationship
class GitHubRepositoryModel(BaseModel):
node_id: str
metadata: DirMetadata
root_directory: DirectoryModel
class TopologyEngine:
def __init__(self) -> None:
self.models: Dict[str, Type[BaseModel]] = {}
self.infer = False
async def populate_model(self, directory_path: str, file_structure: Dict[str, Union[Dict, Tuple[str, ...]]], parent_id: Optional[str] = None) -> DirectoryModel:
directory_id = os.path.basename(directory_path) or "root"
directory = DirectoryModel(
node_id=directory_id,
path=directory_path,
summary=f"Contents of {directory_id}",
default_relationship=Relationship(type="contains", source=parent_id, target=directory_id)
)
async def flatten_model(self, model: NodeModel, parent_id: Optional[str] = None) -> Dict[str, Any]:
result = model.dict()
result["parent_id"] = parent_id
if model.default_relationship:
result.update({
"relationship_type": model.default_relationship.type,
"relationship_source": model.default_relationship.source,
"relationship_target": model.default_relationship.target
})
return result
for key, value in file_structure.items():
if isinstance(value, dict):
# Recurse into subdirectory
subdirectory_path = os.path.join(directory_path, key)
subdirectory = await self.populate_model(subdirectory_path, value, parent_id=directory_id)
directory.subdirectories.append(subdirectory)
elif isinstance(value, tuple) and value[0] == 'file':
# Handle file
document = Document(
node_id=key,
title=key,
default_relationship=Relationship(type="contained_by", source=key, target=directory_id)
)
directory.documents.append(document)
async def recursive_flatten(self, items: Union[List[Dict[str, Any]], Dict[str, Any]], parent_id: Optional[str] = None) -> List[Dict[str, Any]]:
flat_list = []
return directory
if isinstance(items, list):
for item in items:
flat_list.extend(await self.recursive_flatten(item, parent_id))
elif isinstance(items, dict):
model = NodeModel.parse_obj(items)
flat_list.append(await self.flatten_model(model, parent_id))
for child in model.children:
flat_list.extend(await self.recursive_flatten(child, model.node_id))
return flat_list
async def infer_from_directory_structure(self, node_id: str, repository: str, model: Type[BaseModel]) -> GitHubRepositoryModel:
""" Infer the topology of a repository from its file structure """
async def load_data(self, file_path: str) -> Union[List[Dict[str, Any]], Dict[str, Any]]:
try:
if file_path.endswith('.json'):
async with aiofiles.open(file_path, mode='r') as f:
data = await f.read()
return json.loads(data)
elif file_path.endswith('.csv'):
async with aiofiles.open(file_path, mode='r') as f:
content = await f.read()
reader = csv.DictReader(content.splitlines())
return list(reader)
else:
raise ValueError("Unsupported file format")
except Exception as e:
raise RuntimeError(f"Failed to load data from {file_path}: {e}")
path = base_config.data_root_directory
path = path + "/" + str(repository)
print(path)
async def add_graph_topology(self, file_path: str):
try:
data = await self.load_data(file_path)
flt_topology = await self.recursive_flatten(data)
df = pd.DataFrame(flt_topology)
graph_client = await get_graph_client()
if not os.path.exists(path):
raise FileNotFoundError(f"No such directory: {path}")
for _, row in df.iterrows():
node_data = row.to_dict()
node_id = node_data.pop("node_id", None)
await graph_client.add_node(node_id, node_data)
if pd.notna(row.get("relationship_source")) and pd.notna(row.get("relationship_target")):
await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"])
root: Dict[str, Union[Dict, Tuple[str, ...]]] = {}
for filename in glob.glob(f"{path}/**", recursive=True):
parts = os.path.relpath(filename, start=path).split(os.path.sep)
current = root
for part in parts[:-1]: # Traverse/create to the last directory
if part not in current:
current[part] = {}
current = current[part]
last_part = parts[-1]
if os.path.isfile(filename):
current[last_part] = ("file", ...) # Placeholder for file content or metadata
elif os.path.isdir(filename):
if last_part not in current: # Only create a new directory entry if it doesn't exist
current[last_part] = {}
return graph_client.graph
except Exception as e:
raise RuntimeError(f"Failed to add graph topology from {file_path}: {e}")
root_directory = await self.populate_model('/', root)
# Example Main Function:
import asyncio
repository_metadata = DirMetadata(
node_id="repo1",
summary="Example repository",
owner="user1",
directories=[root_directory],
documents=[],
default_relationship=Relationship(type="contained_by", source="repo1", target=node_id)
)
active_model = GitHubRepositoryModel(
node_id=node_id,
metadata=repository_metadata,
root_directory=root_directory
)
return active_model
def load(self, model_name: str) -> Optional[Type[BaseModel]]:
return self.models.get(model_name)
def extrapolate(self, model_name: str) -> None:
# This method would be implementation-specific depending on what "extrapolate" means
pass
async def main():
topology_engine = TopologyEngine()
file_path = 'example_data.json' # or 'example_data.csv'
# Adding graph topology
graph = await topology_engine.add_graph_topology(file_path)
print(graph)
# Run the main function
if __name__ == "__main__":
data_directory_path = os.path.abspath("../../../.data")
print(data_directory_path)
config.data_root_directory(data_directory_path)
cognee_directory_path = os.path.abspath("../.cognee_system")
config.system_root_directory(cognee_directory_path)
async def main() -> None:
engine = TopologyEngine()
# model = engine.load("GitHubRepositoryModel")
# if model is None:
# raise ValueError("Model not found")
result = await engine.infer_from_directory_structure("example_node_id", "example_repo", GitHubRepositoryModel)
print(result)
import asyncio
asyncio.run(main())
# result = engine.extrapolate("GitHubRepositoryModel")
# print(result)