Merge remote-tracking branch 'origin/feat/COG-24-add-qdrant' into feat/COG-24-add-qdrant

This commit is contained in:
Vasilije 2024-03-13 15:33:18 +01:00
commit 801069b4c0
13 changed files with 221 additions and 488 deletions

File diff suppressed because one or more lines are too long

View file

@ -27,7 +27,7 @@ async def add_standalone(
promises = [] promises = []
for data_item in data: for data_item in data:
promises.append(add(data_item, dataset_id, dataset_name)) promises.append(add_standalone(data_item, dataset_id, dataset_name))
results = await asyncio.gather(*promises) results = await asyncio.gather(*promises)
@ -36,7 +36,7 @@ async def add_standalone(
if is_data_path(data): if is_data_path(data):
with open(data.replace("file://", ""), "rb") as file: with open(data.replace("file://", ""), "rb") as file:
return await add(file, dataset_id, dataset_name) return await add_standalone(file, dataset_id, dataset_name)
classified_data = ingestion.classify(data) classified_data = ingestion.classify(data)

View file

@ -21,39 +21,45 @@ from cognitive_architecture.modules.cognify.llm.classify_content import classify
from cognitive_architecture.modules.cognify.llm.content_to_cog_layers import content_to_cog_layers from cognitive_architecture.modules.cognify.llm.content_to_cog_layers import content_to_cog_layers
from cognitive_architecture.modules.cognify.llm.generate_graph import generate_graph from cognitive_architecture.modules.cognify.llm.generate_graph import generate_graph
from cognitive_architecture.shared.data_models import DefaultContentPrediction, KnowledgeGraph, DefaultCognitiveLayer from cognitive_architecture.shared.data_models import DefaultContentPrediction, KnowledgeGraph, DefaultCognitiveLayer
from cognitive_architecture.modules.cognify.graph.create import create_semantic_graph
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognitive_architecture.shared.data_models import GraphDBType from cognitive_architecture.shared.data_models import GraphDBType
from cognitive_architecture.infrastructure.databases.vector.get_vector_database import get_vector_database from cognitive_architecture.infrastructure.databases.vector.get_vector_database import get_vector_database
from cognitive_architecture.infrastructure.databases.relational import DuckDBAdapter from cognitive_architecture.infrastructure.databases.relational import DuckDBAdapter
from cognitive_architecture.modules.cognify.graph.add_document_node import add_document_node
from cognitive_architecture.modules.cognify.graph.initialize_graph import initialize_graph
config = Config() config = Config()
config.load() config.load()
aclient = instructor.patch(OpenAI()) aclient = instructor.patch(OpenAI())
USER_ID = "default_user"
async def cognify(dataset_name: str): async def cognify(dataset_name: str):
"""This function is responsible for the cognitive processing of the content.""" """This function is responsible for the cognitive processing of the content."""
db = DuckDBAdapter() db = DuckDBAdapter()
files_metadata = db.get_files_metadata(dataset_name) files_metadata = db.get_files_metadata(dataset_name)
files = list(files_metadata["file_path"].values())
awaitables = [] awaitables = []
for file in files: await initialize_graph(USER_ID)
with open(file, "rb") as file:
for file_metadata in files_metadata:
with open(file_metadata["file_path"], "rb") as file:
elements = partition_pdf(file = file, strategy = "fast") elements = partition_pdf(file = file, strategy = "fast")
text = "\n".join(map(lambda element: clean(element.text), elements)) text = "\n".join(map(lambda element: clean(element.text), elements))
awaitables.append(process_text(text)) awaitables.append(process_text(text, file_metadata))
graphs = await asyncio.gather(*awaitables) graphs = await asyncio.gather(*awaitables)
return graphs[0] return graphs[0]
async def process_text(input_text: str): async def process_text(input_text: str, file_metadata: dict):
classified_categories = None print(f"Processing document ({file_metadata['id']})")
classified_categories = []
try: try:
# Classify the content into categories # Classify the content into categories
@ -62,13 +68,17 @@ async def process_text(input_text: str):
"classify_content.txt", "classify_content.txt",
DefaultContentPrediction DefaultContentPrediction
) )
file_metadata["categories"] = list(map(lambda category: category["layer_name"], classified_categories))
except Exception as e: except Exception as e:
print(e) print(e)
raise e raise e
await add_document_node(f"DefaultGraphModel:{USER_ID}", file_metadata)
print(f"Document ({file_metadata['id']}) categorized: {file_metadata['categories']}")
cognitive_layers = await content_to_cog_layers( cognitive_layers = await content_to_cog_layers(
"generate_cog_layers.txt", "generate_cog_layers.txt",
classified_categories, classified_categories[0],
response_model = DefaultCognitiveLayer response_model = DefaultCognitiveLayer
) )
@ -84,73 +94,17 @@ async def process_text(input_text: str):
layer_graphs = await generate_graph_per_layer(input_text, cognitive_layers) layer_graphs = await generate_graph_per_layer(input_text, cognitive_layers)
# print(layer_graphs) # print(layer_graphs)
# ADD SUMMARY print(f"Document ({file_metadata['id']}) layer graphs created")
# ADD CATEGORIES
# Define a GraphModel instance with example data # G = await create_semantic_graph(graph_model_instance)
# graph_model_instance = DefaultGraphModel(
# id="user123", await add_classification_nodes(f"DOCUMENT:{file_metadata['id']}", classified_categories[0])
# documents=[
# Document( unique_layer_uuids = await append_to_graph(layer_graphs, classified_categories[0])
# doc_id = "doc1",
# title = "Document 1", print(f"Document ({file_metadata['id']}) layers connected")
# summary = "Summary of Document 1",
# content_id = "content_id_for_doc1",
# doc_type = DocumentType(type_id = "PDF", description = "Portable Document Format"),
# categories = [
# Category(
# category_id = "finance",
# name = "Finance",
# default_relationship = Relationship(type = "belongs_to")
# ),
# Category(
# category_id = "tech",
# name = "Technology",
# default_relationship = Relationship(type = "belongs_to")
# )
# ],
# default_relationship = Relationship(type="has_document")
# ),
# Document(
# doc_id = "doc2",
# title = "Document 2",
# summary = "Summary of Document 2",
# content_id = "content_id_for_doc2",
# doc_type = DocumentType(type_id = "TXT", description = "Text File"),
# categories = [
# Category(
# category_id = "health",
# name = "Health",
# default_relationship = Relationship(type="belongs_to")
# ),
# Category(
# category_id = "wellness",
# name = "Wellness",
# default_relationship = Relationship(type="belongs_to")
# )
# ],
# default_relationship = Relationship(type = "has_document")
# )
# ],
# user_properties = UserProperties(
# custom_properties = {"age": "30"},
# location = UserLocation(
# location_id = "ny",
# description = "New York",
# default_relationship = Relationship(type = "located_in"))
# ),
# default_fields={
# "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
# "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# }
# )
graph_client = get_graph_client(GraphDBType.NETWORKX) graph_client = get_graph_client(GraphDBType.NETWORKX)
# G = await create_semantic_graph(graph_model_instance, graph_client)
await add_classification_nodes("Document:doc1", classified_categories)
unique_layer_uuids = await append_to_graph(layer_graphs, classified_categories, graph_client)
await graph_client.load_graph_from_file() await graph_client.load_graph_from_file()
@ -169,7 +123,6 @@ async def process_text(input_text: str):
size = 3072 size = 3072
) )
}, },
# Set other configs as needed
) )
try: try:
@ -179,25 +132,14 @@ async def process_text(input_text: str):
except Exception as e: except Exception as e:
print(e) print(e)
# from qdrant_client import QdrantClient
# qdrant = QdrantClient(
# url=os.getenv("QDRANT_URL"),
# api_key=os.getenv("QDRANT_API_KEY"))
#
# collections_response = qdrant.http.collections_api.get_collections()
# collections = collections_response.result.collections
# print(collections)
await add_propositions(node_descriptions) await add_propositions(node_descriptions)
grouped_data = await add_node_connection(node_descriptions) grouped_data = await add_node_connection(node_descriptions)
# print("we are here, grouped_data", grouped_data) # print("we are here, grouped_data", grouped_data)
llm_client = get_llm_client() llm_client = get_llm_client()
relationship_dict = await process_items(grouped_data, unique_layer_uuids, llm_client) relationship_dict = await process_items(grouped_data, unique_layer_uuids, llm_client)
# print("we are here", relationship_dict[0]) # print("we are here", relationship_dict[0])
results = await adapted_qdrant_batch_search(relationship_dict, db) results = await adapted_qdrant_batch_search(relationship_dict, db)
@ -208,23 +150,9 @@ async def process_text(input_text: str):
connect_nodes_in_graph(graph, relationship_d) connect_nodes_in_graph(graph, relationship_d)
return graph print(f"Document ({file_metadata['id']}) processed")
# return graph
# grouped_data = {}
#
# # Iterate through each dictionary in the list
# for item in node_descriptions:
# # Get the layer_decomposition_uuid of the current dictionary
# uuid = item["layer_decomposition_uuid"]
#
# # Check if this uuid is already a key in the grouped_data dictionary
# if uuid not in grouped_data:
# # If not, initialize a new list for this uuid
# grouped_data[uuid] = []
#
# # Append the current dictionary to the list corresponding to its uuid
# grouped_data[uuid].append(item)

View file

@ -91,7 +91,7 @@ class NetworXAdapter(GraphDBInterface):
# async def create(self, user_id, custom_user_properties=None, required_layers=None, default_fields=None, existing_graph=None): # async def create(self, user_id, custom_user_properties=None, required_layers=None, default_fields=None, existing_graph=None):
# """Asynchronously create or update a user content graph based on given parameters.""" # """Asynchronously create or update a user content graph based on given parameters."""
# # Assume required_layers is a dictionary-like object; use more robust validation in production # # Assume required_layers is a dictionary-like object; use more robust validation in production
# category_name = required_layers['context_name'] # category_name = required_layers['data_type']
# subgroup_names = [required_layers['layer_name']] # subgroup_names = [required_layers['layer_name']]
# #
# # Construct the additional_categories structure # # Construct the additional_categories structure

View file

@ -9,14 +9,14 @@ class DuckDBAdapter():
self.db_client = duckdb.connect(db_location) self.db_client = duckdb.connect(db_location)
def get_datasets(self): def get_datasets(self):
tables = self.db_client.sql("SELECT DISTINCT schema_name FROM duckdb_tables();").to_df().to_dict() tables = self.db_client.sql("SELECT DISTINCT schema_name FROM duckdb_tables();").to_df().to_dict("list")
return list( return list(
filter( filter(
lambda table_name: table_name.endswith('staging') is False, lambda table_name: table_name.endswith('staging') is False,
tables["schema_name"].values() tables["schema_name"]
) )
) )
def get_files_metadata(self, dataset_name: str): def get_files_metadata(self, dataset_name: str):
return self.db_client.sql(f"SELECT * FROM {dataset_name}.file_metadata;").to_df().to_dict() return self.db_client.sql(f"SELECT * FROM {dataset_name}.file_metadata;").to_df().to_dict("records")

View file

@ -2,25 +2,25 @@
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client, GraphDBType from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client, GraphDBType
async def add_classification_nodes(graph_id, classification_data): async def add_classification_nodes(document_id, classification_data):
graph_client = get_graph_client(GraphDBType.NETWORKX) graph_client = get_graph_client(GraphDBType.NETWORKX)
await graph_client.load_graph_from_file() await graph_client.load_graph_from_file()
context = classification_data["context_name"] data_type = classification_data["data_type"]
layer = classification_data["layer_name"] layer_name = classification_data["layer_name"]
# Create the layer classification node ID using the context_name # Create the layer classification node ID
layer_classification_node_id = f"LLM_LAYER_CLASSIFICATION:{context}:{graph_id}" layer_classification_node_id = f"LLM_LAYER_CLASSIFICATION:{data_type}:{document_id}"
# Add the node to the graph, unpacking the node data from the dictionary # Add the node to the graph, unpacking the node data from the dictionary
await graph_client.add_node(layer_classification_node_id, **classification_data) await graph_client.add_node(layer_classification_node_id, **classification_data)
# Link this node to the corresponding document node # Link this node to the corresponding document node
await graph_client.add_edge(graph_id, layer_classification_node_id, relationship = "classified_as") await graph_client.add_edge(document_id, layer_classification_node_id, relationship = "classified_as")
# Create the detailed classification node ID using the context_name # Create the detailed classification node ID
detailed_classification_node_id = f"LLM_CLASSIFICATION:LAYER:{layer}:{graph_id}" detailed_classification_node_id = f"LLM_CLASSIFICATION:LAYER:{layer_name}:{document_id}"
# Add the detailed classification node, reusing the same node data # Add the detailed classification node, reusing the same node data
await graph_client.add_node(detailed_classification_node_id, **classification_data) await graph_client.add_node(detailed_classification_node_id, **classification_data)
@ -29,22 +29,3 @@ async def add_classification_nodes(graph_id, classification_data):
await graph_client.add_edge(layer_classification_node_id, detailed_classification_node_id, relationship = "contains_analysis") await graph_client.add_edge(layer_classification_node_id, detailed_classification_node_id, relationship = "contains_analysis")
return True return True
# if __name__ == "__main__":
# import asyncio
# # Assuming all necessary imports and GraphDBType, get_graph_client, Document, DocumentType, etc. are defined
# # Initialize the graph client
# graph_client = get_graph_client(GraphDBType.NETWORKX)
# G = asyncio.run(add_classification_nodes(graph_client, "Document:doc1", {"data_type": "text",
# "context_name": "TEXT",
# "layer_name": "Articles, essays, and reports"}))
# from cognitive_architecture.utils import render_graph
# ff = asyncio.run( render_graph(G.graph, graph_type='networkx'))
# print(ff)

View file

@ -0,0 +1,28 @@
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognitive_architecture.shared.data_models import GraphDBType, Document, DocumentType, Category, Relationship
from .create import add_node_and_edge
def create_category(category_name: str):
return Category(
category_id = category_name.lower(),
name = category_name,
default_relationship = Relationship(type = "belongs_to")
)
async def add_document_node(parent_id, document_data):
graph_client = get_graph_client(GraphDBType.NETWORKX)
await graph_client.load_graph_from_file()
document_id = f"DOCUMENT:{document_data['id']}"
document = Document(
doc_id = document_id,
title = document_data["name"],
doc_type = DocumentType(type_id = "PDF", description = "Portable Document Format"),
categories = list(map(create_category, document_data["categories"])) if "categories" in document_data else [],
)
document_dict = document.model_dump()
relationship = Relationship(type = "has_document").model_dump()
await add_node_and_edge(graph_client, parent_id, document_id, document_dict, relationship)

View file

@ -1,6 +1,6 @@
from networkx import Graph
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognitive_architecture.shared.data_models import GraphDBType from cognitive_architecture.shared.data_models import GraphDBType
from networkx import Graph
async def extract_node_descriptions(data): async def extract_node_descriptions(data):
@ -25,7 +25,6 @@ async def add_node_connection(node_descriptions):
return grouped_data return grouped_data
def connect_nodes_in_graph(graph: Graph, relationship_dict: dict) -> Graph: def connect_nodes_in_graph(graph: Graph, relationship_dict: dict) -> Graph:
""" """
For each relationship in relationship_dict, check if both nodes exist in the graph based on node attributes. For each relationship in relationship_dict, check if both nodes exist in the graph based on node attributes.
@ -37,7 +36,6 @@ def connect_nodes_in_graph(graph: Graph, relationship_dict: dict) -> Graph:
for id, relationships in relationship_dict.items(): for id, relationships in relationship_dict.items():
for relationship in relationships: for relationship in relationships:
searched_node_attr_id = relationship['searched_node_id'] searched_node_attr_id = relationship['searched_node_id']
print(searched_node_attr_id)
score_attr_id = relationship['original_id_for_search'] score_attr_id = relationship['original_id_for_search']
score = relationship['score'] score = relationship['score']
@ -58,8 +56,6 @@ def connect_nodes_in_graph(graph: Graph, relationship_dict: dict) -> Graph:
# Check if both nodes were found in the graph # Check if both nodes were found in the graph
if searched_node_key is not None and score_node_key is not None: if searched_node_key is not None and score_node_key is not None:
print(searched_node_key)
print(score_node_key)
# If both nodes exist, create an edge between them # If both nodes exist, create an edge between them
# You can customize the edge attributes as needed, here we use 'score' as an attribute # You can customize the edge attributes as needed, here we use 'score' as an attribute
graph.add_edge(searched_node_key, score_node_key, weight=score, graph.add_edge(searched_node_key, score_node_key, weight=score,

View file

@ -2,24 +2,29 @@
import uuid import uuid
import json import json
from datetime import datetime from datetime import datetime
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client, GraphDBType
async def add_propositions(graph_client, category_name, subclass_content, layer_description, new_data, layer_uuid, async def add_propositions(
layer_decomposition_uuid): data_type,
layer_name,
layer_description,
new_data,
layer_uuid,
layer_decomposition_uuid
):
""" Add nodes and edges to the graph for the given LLM knowledge graph and the layer""" """ Add nodes and edges to the graph for the given LLM knowledge graph and the layer"""
graph_client = get_graph_client(GraphDBType.NETWORKX)
# Find the node ID for the subclass within the category
await graph_client.load_graph_from_file() await graph_client.load_graph_from_file()
subclass_node_id = None layer_node_id = None
for node, data in graph_client.graph.nodes(data=True): for node_id, data in graph_client.graph.nodes(data = True):
if subclass_content in node: if layer_name in node_id:
subclass_node_id = node layer_node_id = node_id
print(subclass_node_id) if not layer_node_id:
print(f"Subclass '{layer_name}' under category '{data_type}' not found in the graph.")
if not subclass_node_id:
print(f"Subclass '{subclass_content}' under category '{category_name}' not found in the graph.")
return graph_client return graph_client
# Mapping from old node IDs to new node IDs # Mapping from old node IDs to new node IDs
@ -28,19 +33,24 @@ async def add_propositions(graph_client, category_name, subclass_content, layer_
# Add nodes from the Pydantic object # Add nodes from the Pydantic object
for node in new_data.nodes: for node in new_data.nodes:
unique_node_id = uuid.uuid4() unique_node_id = uuid.uuid4()
new_node_id = f"{node.description} - {str(layer_uuid)} - {str(layer_decomposition_uuid)} - {str(unique_node_id)}" new_node_id = f"{node.description} - {str(layer_uuid)} - {str(layer_decomposition_uuid)} - {str(unique_node_id)}"
await graph_client.add_node(new_node_id,
created_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), await graph_client.add_node(
updated_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"), new_node_id,
description=node.description, created_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
category=node.category, updated_at=datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
memory_type=node.memory_type, description=node.description,
layer_uuid=str(layer_uuid), category=node.category,
layer_description=str(layer_description), memory_type=node.memory_type,
layer_decomposition_uuid=str(layer_decomposition_uuid), layer_uuid=str(layer_uuid),
unique_id=str(unique_node_id), layer_description=str(layer_description),
type='detail') layer_decomposition_uuid=str(layer_decomposition_uuid),
await graph_client.add_edge(subclass_node_id, new_node_id, relationship='detail') unique_id=str(unique_node_id),
type='detail'
)
await graph_client.add_edge(layer_node_id, new_node_id, relationship='detail')
# Store the mapping from old node ID to new node ID # Store the mapping from old node ID to new node ID
node_id_mapping[node.id] = new_node_id node_id_mapping[node.id] = new_node_id
@ -56,18 +66,16 @@ async def add_propositions(graph_client, category_name, subclass_content, layer_
else: else:
print(f"Could not find mapping for edge from {edge.source} to {edge.target}") print(f"Could not find mapping for edge from {edge.source} to {edge.target}")
return graph_client async def append_to_graph(layer_graphs, required_layers):
async def append_to_graph(layer_graphs, required_layers, graph_client):
# Generate a UUID for the overall layer # Generate a UUID for the overall layer
layer_uuid = uuid.uuid4() layer_uuid = uuid.uuid4()
decomposition_uuids = set() decomposition_uuids = set()
# Extract category name from required_layers data # Extract category name from required_layers data
category_name = required_layers["data_type"] data_type = required_layers["data_type"]
# Extract subgroup name from required_layers data # Extract subgroup name from required_layers data
# Assuming there's always at least one subclass and we're taking the first # Assuming there's always at least one layer and we're taking the first
subgroup_name = required_layers["layer_name"] layer_name = required_layers["layer_name"]
for layer_ind in layer_graphs: for layer_ind in layer_graphs:
@ -77,14 +85,19 @@ async def append_to_graph(layer_graphs, required_layers, graph_client):
# Generate a UUID for this particular layer decomposition # Generate a UUID for this particular layer decomposition
layer_decomposition_uuid = uuid.uuid4() layer_decomposition_uuid = uuid.uuid4()
decomposition_uuids.add(layer_decomposition_uuid) decomposition_uuids.add(layer_decomposition_uuid)
# Assuming append_data_to_graph is defined elsewhere and appends data to graph_client # Assuming append_data_to_graph is defined elsewhere and appends data to graph_client
# You would pass relevant information from knowledge_graph along with other details to this function # You would pass relevant information from knowledge_graph along with other details to this function
await add_propositions(graph_client, category_name, subgroup_name, layer_description, knowledge_graph, await add_propositions(
layer_uuid, layer_decomposition_uuid) data_type,
layer_name,
# Print updated graph for verification (assuming F is the updated NetworkX Graph) layer_description,
print("Updated Nodes:", graph_client.graph.nodes(data=True)) knowledge_graph,
layer_uuid,
layer_decomposition_uuid
)
return decomposition_uuids return decomposition_uuids

View file

@ -1,13 +1,12 @@
""" This module is responsible for creating a semantic graph """ """ This module is responsible for creating a semantic graph """
from datetime import datetime
from typing import Optional, Any from typing import Optional, Any
from pydantic import BaseModel from pydantic import BaseModel
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognitive_architecture.shared.data_models import GraphDBType, DefaultGraphModel, Document, DocumentType, Category, Relationship, UserProperties, UserLocation from cognitive_architecture.shared.data_models import GraphDBType
async def generate_node_id(instance: BaseModel) -> str: async def generate_node_id(instance: BaseModel) -> str:
for field in ['id', 'doc_id', 'location_id', 'type_id']: for field in ["id", "doc_id", "location_id", "type_id"]:
if hasattr(instance, field): if hasattr(instance, field):
return f"{instance.__class__.__name__}:{getattr(instance, field)}" return f"{instance.__class__.__name__}:{getattr(instance, field)}"
return f"{instance.__class__.__name__}:default" return f"{instance.__class__.__name__}:default"
@ -19,100 +18,100 @@ async def add_node_and_edge(client, parent_id: Optional[str], node_id: str, node
await client.add_edge(parent_id, node_id, **relationship_data) await client.add_edge(parent_id, node_id, **relationship_data)
async def process_attribute(G, parent_id: Optional[str], attribute: str, value: Any): async def process_attribute(graph_client, parent_id: Optional[str], attribute: str, value: Any):
if isinstance(value, BaseModel): if isinstance(value, BaseModel):
node_id = await generate_node_id(value) node_id = await generate_node_id(value)
node_data = value.dict(exclude={'default_relationship'})
node_data = value.dict(exclude={"default_relationship"})
# Use the specified default relationship for the edge between the parent node and the current node # Use the specified default relationship for the edge between the parent node and the current node
relationship_data = value.default_relationship.dict() if hasattr(value, 'default_relationship') else {} relationship_data = value.default_relationship.dict() if hasattr(value, "default_relationship") else {}
await add_node_and_edge(G, parent_id, node_id, node_data, relationship_data)
await add_node_and_edge(graph_client, parent_id, node_id, node_data, relationship_data)
# Recursively process nested attributes to ensure all nodes and relationships are added to the graph # Recursively process nested attributes to ensure all nodes and relationships are added to the graph
for sub_attr, sub_val in value.__dict__.items(): # Access attributes and their values directly for sub_attr, sub_val in value.__dict__.items(): # Access attributes and their values directly
await process_attribute(G, node_id, sub_attr, sub_val) await process_attribute(graph_client, node_id, sub_attr, sub_val)
elif isinstance(value, list) and all(isinstance(item, BaseModel) for item in value): elif isinstance(value, list) and all(isinstance(item, BaseModel) for item in value):
# For lists of BaseModel instances, process each item in the list # For lists of BaseModel instances, process each item in the list
for item in value: for item in value:
await process_attribute(G, parent_id, attribute, item) await process_attribute(graph_client, parent_id, attribute, item)
async def create_dynamic(graph_model, client) :
await client.load_graph_from_file()
async def create_dynamic(graph_model) :
root_id = await generate_node_id(graph_model) root_id = await generate_node_id(graph_model)
node_data = graph_model.dict(exclude = {"default_relationship", "id"}) node_data = graph_model.dict(exclude = {"default_relationship", "id"})
print(node_data)
await client.add_node(root_id, **node_data) graph_client = get_graph_client(GraphDBType.NETWORKX)
await graph_client.add_node(root_id, **node_data)
for attribute_name, attribute_value in graph_model: for attribute_name, attribute_value in graph_model:
await process_attribute(client, root_id, attribute_name, attribute_value) await process_attribute(graph_client, root_id, attribute_name, attribute_value)
return client return graph_client
async def create_semantic_graph(graph_model_instance, graph_client): async def create_semantic_graph(graph_model_instance):
await graph_client.load_graph_from_file()
# Dynamic graph creation based on the provided graph model instance # Dynamic graph creation based on the provided graph model instance
graph = await create_dynamic(graph_model_instance, graph_client) graph = await create_dynamic(graph_model_instance)
# Example of adding a node and saving the graph can be demonstrated in the __main__ section or in tests
return graph return graph
if __name__ == "__main__":
import asyncio
# Assuming all necessary imports and GraphDBType, get_graph_client, Document, DocumentType, etc. are defined
# Initialize the graph client # if __name__ == "__main__":
graph_client = get_graph_client(GraphDBType.NETWORKX) # import asyncio
# Define a GraphModel instance with example data # # Assuming all necessary imports and GraphDBType, get_graph_client, Document, DocumentType, etc. are defined
graph_model_instance = DefaultGraphModel(
id="user123",
documents=[
Document(
doc_id="doc1",
title="Document 1",
summary="Summary of Document 1",
content_id="content_id_for_doc1",
doc_type=DocumentType(type_id="PDF", description="Portable Document Format"),
categories=[
Category(category_id="finance", name="Finance", default_relationship=Relationship(type="belongs_to")),
Category(category_id="tech", name="Technology", default_relationship=Relationship(type="belongs_to"))
],
default_relationship=Relationship(type="has_document")
),
Document(
doc_id="doc2",
title="Document 2",
summary="Summary of Document 2",
content_id="content_id_for_doc2",
doc_type=DocumentType(type_id="TXT", description="Text File"),
categories=[
Category(category_id="health", name="Health", default_relationship=Relationship(type="belongs_to")),
Category(category_id="wellness", name="Wellness", default_relationship=Relationship(type="belongs_to"))
],
default_relationship=Relationship(type="has_document")
)
],
user_properties=UserProperties(
custom_properties={"age": "30"},
location=UserLocation(location_id="ny", description="New York", default_relationship=Relationship(type="located_in"))
),
default_fields={
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
)
# Run the graph creation asynchronously # # Initialize the graph client
G = asyncio.run(create_semantic_graph(graph_model_instance, graph_client)) # graph_client = get_graph_client(GraphDBType.NETWORKX)
# Optionally, here you can add more nodes, edges, or perform other operations on the graph G # # Define a GraphModel instance with example data
# graph_model_instance = DefaultGraphModel(
# id="user123",
# documents=[
# Document(
# doc_id="doc1",
# title="Document 1",
# summary="Summary of Document 1",
# content_id="content_id_for_doc1",
# doc_type=DocumentType(type_id="PDF", description="Portable Document Format"),
# categories=[
# Category(category_id="finance", name="Finance", default_relationship=Relationship(type="belongs_to")),
# Category(category_id="tech", name="Technology", default_relationship=Relationship(type="belongs_to"))
# ],
# default_relationship=Relationship(type="has_document")
# ),
# Document(
# doc_id="doc2",
# title="Document 2",
# summary="Summary of Document 2",
# content_id="content_id_for_doc2",
# doc_type=DocumentType(type_id="TXT", description="Text File"),
# categories=[
# Category(category_id="health", name="Health", default_relationship=Relationship(type="belongs_to")),
# Category(category_id="wellness", name="Wellness", default_relationship=Relationship(type="belongs_to"))
# ],
# default_relationship=Relationship(type="has_document")
# )
# ],
# user_properties=UserProperties(
# custom_properties={"age": "30"},
# location=UserLocation(location_id="ny", description="New York", default_relationship=Relationship(type="located_in"))
# ),
# default_fields={
# "created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
# "updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# }
# )
# # Run the graph creation asynchronously
# G = asyncio.run(create_semantic_graph(graph_model_instance, graph_client))
# # Optionally, here you can add more nodes, edges, or perform other operations on the graph G
# async def create_semantic_graph( # async def create_semantic_graph(
# ): # ):

View file

@ -0,0 +1,22 @@
from datetime import datetime
from cognitive_architecture.shared.data_models import DefaultGraphModel, Relationship, UserProperties, UserLocation
from cognitive_architecture.modules.cognify.graph.create import create_semantic_graph
async def initialize_graph(root_id: str):
graph = DefaultGraphModel(
id = root_id,
user_properties = UserProperties(
custom_properties = {"age": "30"},
location = UserLocation(
location_id = "ny",
description = "New York",
default_relationship = Relationship(type = "located_in")
)
),
default_fields = {
"created_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"updated_at": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
)
await create_semantic_graph(graph)

View file

@ -1,5 +1,5 @@
""" This module contains the code to classify content into categories using the LLM API. """ """ This module contains the code to classify content into categories using the LLM API. """
from typing import Type from typing import Type, List
from pydantic import BaseModel from pydantic import BaseModel
from cognitive_architecture.infrastructure.llm.get_llm_client import get_llm_client from cognitive_architecture.infrastructure.llm.get_llm_client import get_llm_client
from cognitive_architecture.utils import read_query_prompt from cognitive_architecture.utils import read_query_prompt
@ -13,24 +13,23 @@ async def classify_into_categories(text_input: str, system_prompt_path: str, res
return extract_categories(llm_output.dict()) return extract_categories(llm_output.dict())
def extract_categories(llm_output): def extract_categories(llm_output) -> List[dict]:
# Extract the first subclass from the list (assuming there could be more) # Extract the first subclass from the list (assuming there could be more)
subclass_enum = llm_output["label"]["subclass"][0] layer_enum = llm_output["label"]["subclass"][0]
# The data type is derived from "type" and converted to lowercase # The data type is derived from "type" and converted to lowercase
data_type = llm_output["label"]["type"].lower() data_type = llm_output["label"]["type"].lower()
# The context name is the name of the Enum member (e.g., "NEWS_STORIES") # The layer name is the name of the Enum member (e.g., "NEWS_STORIES")
# context_name = subclass_enum.name.replace("_", " ").title() # layer_name = layer_enum.name.replace("_", " ").title()
# The layer name is the value of the Enum member (e.g., "News stories and blog posts") # The layer name is the value of the Enum member (e.g., "News stories and blog posts")
layer_name = subclass_enum.value layer_name = layer_enum.value
return { return [{
"data_type": data_type, "data_type": data_type,
"context_name": data_type.upper(), # llm context classification
"layer_name": layer_name # llm layer classification "layer_name": layer_name # llm layer classification
} }]
# if __name__ == "__main__": # if __name__ == "__main__":
# import asyncio # import asyncio

View file

@ -10,15 +10,3 @@ async def content_to_cog_layers(filename: str, context, response_model: Type[Bas
formatted_text_input = await async_render_template(filename, context) formatted_text_input = await async_render_template(filename, context)
return await llm_client.acreate_structured_output(formatted_text_input, formatted_text_input, response_model) return await llm_client.acreate_structured_output(formatted_text_input, formatted_text_input, response_model)
# if __name__ == "__main__":
# import asyncio
# asyncio.run(content_to_cog_layers("generate_cog_layers.txt", {
# 'data_type': 'text',
# 'context_name': 'Scientific Research',
# 'layer_name': 'Content Layer'
# }, response_model=CognitiveLayer))