diff --git a/cognee/api/v1/topology/add_topology.py b/cognee/api/v1/topology/add_topology.py index 58d99f21d..c7f42f638 100644 --- a/cognee/api/v1/topology/add_topology.py +++ b/cognee/api/v1/topology/add_topology.py @@ -84,13 +84,13 @@ async def add_topology(directory: str = "example", model: BaseModel = GitHubRepo df = pd.DataFrame(flt_topology) - for _, row in df.iterrows(): - node_data = row.to_dict() - node_id = node_data.pop("node_id") + for _, row in df.iterrows(): + node_data = row.to_dict() + node_id = node_data.pop("node_id") - # Remove "node_id" and get its value - await graph_client.add_node(node_id, node_data) - if pd.notna(row["relationship_source"]) and pd.notna(row["relationship_target"]): - await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"]) + # Remove "node_id" and get its value + await graph_client.add_node(node_id, node_data) + if pd.notna(row["relationship_source"]) and pd.notna(row["relationship_target"]): + await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"]) return graph_client.graph diff --git a/cognee/infrastructure/databases/graph/config.py b/cognee/infrastructure/databases/graph/config.py index cc9870d4f..ec9c95db8 100644 --- a/cognee/infrastructure/databases/graph/config.py +++ b/cognee/infrastructure/databases/graph/config.py @@ -19,6 +19,7 @@ class GraphConfig(BaseSettings): ) graph_engine: object = GraphDBType.NETWORKX graph_model: object = KnowledgeGraph + graph_topology: object = KnowledgeGraph model_config = SettingsConfigDict(env_file=".env", extra="allow") diff --git a/cognee/modules/topology/example_data.json b/cognee/modules/topology/example_data.json new file mode 100644 index 000000000..638b1c3fe --- /dev/null +++ b/cognee/modules/topology/example_data.json @@ -0,0 +1,25 @@ +[ + { + "node_id": "1", + "name": "Node 1", + "default_relationship": { + "type": "related_to", + "source": "1", + "target": "2" + }, + "children": [ + { + "node_id": "2", + "name": "Node 2", + "default_relationship": { + "type": "related_to", + "source": "2", + "target": "3" + }, + "children": [] + } + ] + } +] + + diff --git a/cognee/modules/topology/infer_data_topology.py b/cognee/modules/topology/infer_data_topology.py index 5b4683a40..4067998f0 100644 --- a/cognee/modules/topology/infer_data_topology.py +++ b/cognee/modules/topology/infer_data_topology.py @@ -7,12 +7,12 @@ logger = logging.getLogger(__name__) async def infer_data_topology(content: str, graph_topology=None): if graph_topology is None: graph_config = get_graph_config() - graph_topology = graph_config.graph_model + graph_topology = graph_config.graph_topology try: return (await extract_topology( content, graph_topology )) except Exception as error: - logger.error("Error extracting cognitive layers from content: %s", error, exc_info = True) + logger.error("Error extracting topology from content: %s", error, exc_info = True) raise error diff --git a/cognee/modules/topology/topology.py b/cognee/modules/topology/topology.py index bdd1ccad7..739d1dad7 100644 --- a/cognee/modules/topology/topology.py +++ b/cognee/modules/topology/topology.py @@ -1,20 +1,188 @@ -import os -import glob -from pydantic import BaseModel, Field -from typing import Dict, List, Optional, Union, Type, Any, Tuple +# import csv +# import json +# import os +# import glob +# +# import aiofiles +# from pydantic import BaseModel, Field +# from typing import Dict, List, Optional, Union, Type, Any, Tuple +# +# from cognee import config +# from cognee.base_config import get_base_config +# from cognee.infrastructure.databases.graph import get_graph_config +# from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client +# from cognee.modules.cognify.config import get_cognify_config +# import pandas as pd +# from pydantic import BaseModel, Field +# from typing import Any, Dict, List, Optional, Union +# +# +# class RelationshipModel(BaseModel): +# type: str +# source: str +# target: str +# +# +# class NodeModel(BaseModel): +# node_id: str +# name: str +# default_relationship: Optional[RelationshipModel] = None +# children: List[Union[Dict[str, Any], "NodeModel"]] = Field(default_factory=list) +# +# +# NodeModel.update_forward_refs() +# cognify_config = get_cognify_config() +# base_config = get_base_config() +# +# class Relationship(BaseModel): +# type: str = Field(..., description="The type of relationship, e.g., 'belongs_to'.") +# source: Optional[str] = Field(None, description="The identifier of the source id of in the relationship being a directory or subdirectory") +# target: Optional[str] = Field(None, description="The identifier of the target id in the relationship being the directory, subdirectory or file") +# properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties and values related to the relationship.") +# +# +# class Document(BaseModel): +# node_id: str +# title: str +# description: Optional[str] = None +# default_relationship: Relationship +# +# +# class DirectoryModel(BaseModel): +# node_id: str +# path: str +# summary: str +# documents: List[Document] = [] +# subdirectories: List['DirectoryModel'] = [] +# default_relationship: Relationship +# +# +# DirectoryModel.update_forward_refs() +# +# +# class DirMetadata(BaseModel): +# node_id: str +# summary: str +# owner: str +# description: Optional[str] = None +# directories: List[DirectoryModel] = [] +# documents: List[Document] = [] +# default_relationship: Relationship +# +# +# class GitHubRepositoryModel(BaseModel): +# node_id: str +# metadata: DirMetadata +# root_directory: DirectoryModel +# +# +# class TopologyEngine: +# def __init__(self) -> None: +# self.models: Dict[str, Type[BaseModel]] = {} +# self.infer = False +# async def flatten_model(self, model: NodeModel, parent_id: Optional[str] = None) -> Dict[str, Any]: +# result = model.dict() +# result["parent_id"] = parent_id +# if model.default_relationship: +# result.update({ +# "relationship_type": model.default_relationship.type, +# "relationship_source": model.default_relationship.source, +# "relationship_target": model.default_relationship.target +# }) +# return result +# +# async def recursive_flatten(self, items: Union[List[Dict[str, Any]], Dict[str, Any]], parent_id: Optional[str] = None) -> List[Dict[str, Any]]: +# flat_list = [] +# +# if isinstance(items, list): +# for item in items: +# flat_list.extend(await self.recursive_flatten(item, parent_id)) +# elif isinstance(items, dict): +# item = NodeModel.parse_obj(items) +# flat_list.append(await self.flatten_model(item, parent_id)) +# for child in item.children: +# flat_list.extend(await self.recursive_flatten(child, item.node_id)) +# return flat_list +# +# async def load_data(self, file_path: str) -> Union[List[Dict[str, Any]], Dict[str, Any]]: +# if file_path.endswith('.json'): +# async with aiofiles.open(file_path, mode='r') as f: +# data = await f.read() +# return json.loads(data) +# elif file_path.endswith('.csv'): +# async with aiofiles.open(file_path, mode='r') as f: +# reader = csv.DictReader(await f.read().splitlines()) +# return list(reader) +# else: +# raise ValueError("Unsupported file format") +# +# async def add_graph_topology(self, file_path: str): +# data = await self.load_data(file_path) +# +# flt_topology = await self.recursive_flatten(data) +# print(flt_topology) +# df = pd.DataFrame(flt_topology) +# graph_client = await get_graph_client() +# +# for _, row in df.iterrows(): +# node_data = row.to_dict() +# node_id = node_data.pop("node_id", None) +# await graph_client.add_node(node_id, node_data) +# if pd.notna(row["relationship_source"]) and pd.notna(row["relationship_target"]): +# await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"]) +# +# return graph_client.graph +# +# +# +# +# if __name__ == "__main__": +# async def main(): +# topology_engine = TopologyEngine() +# file_path = 'example_data.json' # or 'example_data.csv' +# +# # Adding graph topology +# graph = await topology_engine.add_graph_topology(file_path) +# print(graph) +# +# +# import asyncio +# asyncio.run(main()) +# # result = engine.extrapolate("GitHubRepositoryModel") +# # print(result) -from cognee import config -from cognee.base_config import get_base_config +import csv +import json +import aiofiles +import pandas as pd +from pydantic import BaseModel, Field +from typing import Any, Dict, List, Optional, Union, Type + +from cognee.infrastructure.databases.graph.get_graph_client import get_graph_client from cognee.modules.cognify.config import get_cognify_config +from cognee.base_config import get_base_config + +# Define models +class RelationshipModel(BaseModel): + type: str + source: str + target: str + +class NodeModel(BaseModel): + node_id: str + name: str + default_relationship: Optional[RelationshipModel] = None + children: List[Union[Dict[str, Any], "NodeModel"]] = Field(default_factory=list) + +NodeModel.update_forward_refs() cognify_config = get_cognify_config() base_config = get_base_config() class Relationship(BaseModel): type: str = Field(..., description="The type of relationship, e.g., 'belongs_to'.") - source: Optional[str] = Field(None, description="The identifier of the source id of in the relationship being a directory or subdirectory") - target: Optional[str] = Field(None, description="The identifier of the target id in the relationship being the directory, subdirectory or file") - properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties and values related to the relationship.") - + source: Optional[str] = Field(None, description="The identifier of the source id in the relationship.") + target: Optional[str] = Field(None, description="The identifier of the target id in the relationship.") + properties: Optional[Dict[str, Any]] = Field(None, description="A dictionary of additional properties related to the relationship.") class Document(BaseModel): node_id: str @@ -22,7 +190,6 @@ class Document(BaseModel): description: Optional[str] = None default_relationship: Relationship - class DirectoryModel(BaseModel): node_id: str path: str @@ -31,10 +198,8 @@ class DirectoryModel(BaseModel): subdirectories: List['DirectoryModel'] = [] default_relationship: Relationship - DirectoryModel.update_forward_refs() - class DirMetadata(BaseModel): node_id: str summary: str @@ -44,111 +209,85 @@ class DirMetadata(BaseModel): documents: List[Document] = [] default_relationship: Relationship - class GitHubRepositoryModel(BaseModel): node_id: str metadata: DirMetadata root_directory: DirectoryModel - class TopologyEngine: def __init__(self) -> None: self.models: Dict[str, Type[BaseModel]] = {} + self.infer = False - async def populate_model(self, directory_path: str, file_structure: Dict[str, Union[Dict, Tuple[str, ...]]], parent_id: Optional[str] = None) -> DirectoryModel: - directory_id = os.path.basename(directory_path) or "root" - directory = DirectoryModel( - node_id=directory_id, - path=directory_path, - summary=f"Contents of {directory_id}", - default_relationship=Relationship(type="contains", source=parent_id, target=directory_id) - ) + async def flatten_model(self, model: NodeModel, parent_id: Optional[str] = None) -> Dict[str, Any]: + result = model.dict() + result["parent_id"] = parent_id + if model.default_relationship: + result.update({ + "relationship_type": model.default_relationship.type, + "relationship_source": model.default_relationship.source, + "relationship_target": model.default_relationship.target + }) + return result - for key, value in file_structure.items(): - if isinstance(value, dict): - # Recurse into subdirectory - subdirectory_path = os.path.join(directory_path, key) - subdirectory = await self.populate_model(subdirectory_path, value, parent_id=directory_id) - directory.subdirectories.append(subdirectory) - elif isinstance(value, tuple) and value[0] == 'file': - # Handle file - document = Document( - node_id=key, - title=key, - default_relationship=Relationship(type="contained_by", source=key, target=directory_id) - ) - directory.documents.append(document) + async def recursive_flatten(self, items: Union[List[Dict[str, Any]], Dict[str, Any]], parent_id: Optional[str] = None) -> List[Dict[str, Any]]: + flat_list = [] - return directory + if isinstance(items, list): + for item in items: + flat_list.extend(await self.recursive_flatten(item, parent_id)) + elif isinstance(items, dict): + model = NodeModel.parse_obj(items) + flat_list.append(await self.flatten_model(model, parent_id)) + for child in model.children: + flat_list.extend(await self.recursive_flatten(child, model.node_id)) + return flat_list - async def infer_from_directory_structure(self, node_id: str, repository: str, model: Type[BaseModel]) -> GitHubRepositoryModel: - """ Infer the topology of a repository from its file structure """ + async def load_data(self, file_path: str) -> Union[List[Dict[str, Any]], Dict[str, Any]]: + try: + if file_path.endswith('.json'): + async with aiofiles.open(file_path, mode='r') as f: + data = await f.read() + return json.loads(data) + elif file_path.endswith('.csv'): + async with aiofiles.open(file_path, mode='r') as f: + content = await f.read() + reader = csv.DictReader(content.splitlines()) + return list(reader) + else: + raise ValueError("Unsupported file format") + except Exception as e: + raise RuntimeError(f"Failed to load data from {file_path}: {e}") - path = base_config.data_root_directory - path = path + "/" + str(repository) - print(path) + async def add_graph_topology(self, file_path: str): + try: + data = await self.load_data(file_path) + flt_topology = await self.recursive_flatten(data) + df = pd.DataFrame(flt_topology) + graph_client = await get_graph_client() - if not os.path.exists(path): - raise FileNotFoundError(f"No such directory: {path}") + for _, row in df.iterrows(): + node_data = row.to_dict() + node_id = node_data.pop("node_id", None) + await graph_client.add_node(node_id, node_data) + if pd.notna(row.get("relationship_source")) and pd.notna(row.get("relationship_target")): + await graph_client.add_edge(row["relationship_source"], row["relationship_target"], relationship_name=row["relationship_type"]) - root: Dict[str, Union[Dict, Tuple[str, ...]]] = {} - for filename in glob.glob(f"{path}/**", recursive=True): - parts = os.path.relpath(filename, start=path).split(os.path.sep) - current = root - for part in parts[:-1]: # Traverse/create to the last directory - if part not in current: - current[part] = {} - current = current[part] - last_part = parts[-1] - if os.path.isfile(filename): - current[last_part] = ("file", ...) # Placeholder for file content or metadata - elif os.path.isdir(filename): - if last_part not in current: # Only create a new directory entry if it doesn't exist - current[last_part] = {} + return graph_client.graph + except Exception as e: + raise RuntimeError(f"Failed to add graph topology from {file_path}: {e}") - root_directory = await self.populate_model('/', root) +# Example Main Function: +import asyncio - repository_metadata = DirMetadata( - node_id="repo1", - summary="Example repository", - owner="user1", - directories=[root_directory], - documents=[], - default_relationship=Relationship(type="contained_by", source="repo1", target=node_id) - ) - - active_model = GitHubRepositoryModel( - node_id=node_id, - metadata=repository_metadata, - root_directory=root_directory - ) - - return active_model - - def load(self, model_name: str) -> Optional[Type[BaseModel]]: - return self.models.get(model_name) - - def extrapolate(self, model_name: str) -> None: - # This method would be implementation-specific depending on what "extrapolate" means - pass +async def main(): + topology_engine = TopologyEngine() + file_path = 'example_data.json' # or 'example_data.csv' + # Adding graph topology + graph = await topology_engine.add_graph_topology(file_path) + print(graph) +# Run the main function if __name__ == "__main__": - data_directory_path = os.path.abspath("../../../.data") - print(data_directory_path) - config.data_root_directory(data_directory_path) - cognee_directory_path = os.path.abspath("../.cognee_system") - config.system_root_directory(cognee_directory_path) - - async def main() -> None: - engine = TopologyEngine() - # model = engine.load("GitHubRepositoryModel") - # if model is None: - # raise ValueError("Model not found") - result = await engine.infer_from_directory_structure("example_node_id", "example_repo", GitHubRepositoryModel) - print(result) - - import asyncio asyncio.run(main()) - # result = engine.extrapolate("GitHubRepositoryModel") - # print(result)