Add utils for graph visualization + classification nodes

This commit is contained in:
Vasilije 2024-03-10 23:27:56 +01:00
parent ed1b44fd7b
commit faf7e6ae59
13 changed files with 781 additions and 625 deletions

File diff suppressed because one or more lines are too long

View file

@ -1,5 +1,6 @@
import asyncio
import logging
from datetime import datetime
from langchain.prompts import ChatPromptTemplate
import json
@ -12,6 +13,11 @@ import re
from dotenv import load_dotenv
import os
from cognitive_architecture.infrastructure.databases.vector.qdrant.adapter import CollectionConfig
from cognitive_architecture.modules.cognify.graph.add_classification_nodes import add_classification_nodes
from cognitive_architecture.modules.cognify.graph.add_propositions import append_to_graph
from cognitive_architecture.modules.cognify.vector.load_propositions import add_propositions
# Load environment variables from .env file
load_dotenv()
import instructor
@ -27,47 +33,22 @@ from cognitive_architecture.modules.cognify.llm.classify_content import classify
from cognitive_architecture.modules.cognify.llm.content_to_cog_layers import content_to_cog_layers
from cognitive_architecture.modules.cognify.llm.content_to_propositions import generate_graph
from cognitive_architecture.shared.data_models import DefaultContentPrediction, KnowledgeGraph, DefaultCognitiveLayer
from cognitive_architecture.modules.cognify.graph.create import create_semantic_graph
from typing import Optional, Any
from pydantic import BaseModel
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognitive_architecture.shared.data_models import GraphDBType, DefaultGraphModel, Document, DocumentType, Category, Relationship, UserProperties, UserLocation
from qdrant_client import models
from cognitive_architecture.infrastructure.databases.vector.get_vector_database import get_vector_database
async def cognify():
async def cognify(input_text:str):
"""This function is responsible for the cognitive processing of the content."""
# Load the content from the text file
input_article_one= input_text
# Classify the content into categories
input_article_one = """ In the nicest possible way, Britons have always been a bit silly about animals. “Keeping pets, for the English, is not so much a leisure activity as it is an entire way of life,” wrote the anthropologist Kate Fox in Watching the English, nearly 20 years ago. Our dogs, in particular, have been an acceptable outlet for emotions and impulses we otherwise keep strictly controlled our latent desire to be demonstratively affectionate, to be silly and chat to strangers. If this seems like an exaggeration, consider the different reactions youd get if you struck up a conversation with someone in a park with a dog, versus someone on the train.
Indeed, British society has been set up to accommodate these four-legged ambassadors. In the UK unlike Australia, say, or New Zealand dogs are not just permitted on public transport but often openly encouraged. Many pubs and shops display waggish signs, reading, Dogs welcome, people tolerated, and have treat jars on their counters. The other day, as I was waiting outside a cafe with a friends dog, the barista urged me to bring her inside.
For years, Britons non-partisan passion for animals has been consistent amid dwindling common ground. But lately, rather than bringing out the best in us, our relationship with dogs is increasingly revealing us at our worst and our supposed best friends are paying the price.
As with so many latent traits in the national psyche, it all came unleashed with the pandemic, when many people thought they might as well make the most of all that time at home and in local parks with a dog. Between 2019 and 2022, the number of pet dogs in the UK rose from about nine million to 13 million. But theres long been a seasonal surge around this time of year, substantial enough for the Dogs Trust charity to coin its famous slogan back in 1978: A dog is for life, not just for Christmas.
Green spaces, meanwhile, have been steadily declining, and now many of us have returned to the office, just as those pandemic dogs are entering their troublesome teens. Its a combustible combination and we are already seeing the results: the number of dog attacks recorded by police in England and Wales rose by more than a third between 2018 and 2022.
At the same time, sites such as Pets4Homes.co.uk are replete with listings for dogs that, their owners accept with deep regret, are no longer suited to their lifestyles now that lockdown is over. It may have felt as if it would go on for ever, but was there ever any suggestion it was going to last the average dogs lifespan of a decade?
Living beings are being downgraded to mere commodities. You can see it reflected the designer breeds currently in fashion, the French bulldogs and pugs that look cute but spend their entire lives in discomfort. American XL bully dogs, now so controversial, are often sought after as a signifier of masculinity: roping an entire other life in service of our egos. Historically, many of Britains most popular breeds evolved to hunt vermin, retrieve game, herd, or otherwise do a specific job alongside humans; these days we are breeding and buying them for their aesthetic appeal.
Underpinning this is a shift to what was long disdained as the American approach: treating pets as substitutes for children. In the past in Britain, dogs were treasured on their own terms, for the qualities that made them dogs, and as such, sometimes better than people: their friendliness and trustingness and how they opened up the world for us. They were indulged, certainly by allowing them on to the sofa or in our beds, for instance, when wed sworn we never would but in ways that did not negate or deny their essential otherness.
Now we have more dogs of such ludicrous proportions, they struggle to function as dogs at all and we treat them accordingly, indulging them as we would ourselves: by buying unnecessary things. The total spend on pets in the UK has more than doubled in the past decade, reaching nearly £10bn last year. That huge rise has not just come from essentials: figures from the marketing agency Mintel suggest that one in five UK owners like their pet to keep up with the latest trends in grooming or, heaven forbid, outfits.
These days pet boutiques like the one that recently opened on my street in Norwich, selling cold-pressed dog treats, paw and nose balms and spa services are a widespread sign of gentrification. But its not just wealthier areas: this summer in Great Yarmouth, one of the most deprived towns in the country, I noticed seaside stalls selling not one but two brands of ice-cream for dogs.
It suggests dog-lovers have become untethered from their companions desires, let alone their needs. Lets be honest: most dogs would be thrilled to bits to be eating a paper bag, or even their own faeces. And although they are certainly delighted by ice-cream, they dont need it. But the ways we ourselves find solace in consumption, by indulging our simian treat brain with things that we dont need and/or arent good for us we have simply extended to our pets.
Its hard not to see the rise in dog-friendly restaurants, cinema screenings and even churches as similar to the ludicrous expenditure: a way to placate the two-legged being on the end of the lead (regardless of the experience of others in the vicinity).
Meanwhile, many dogs suffer daily deprivation, their worlds made small and monotonous by our busy modern schedules. These are social animals: its not natural for them to live without other dogs, let alone in an empty house for eight hours a day, Monday to Friday. If we are besieged by badly behaved dogs, the cause isnt hard to pinpoint. Many behavioural problems can be alleviated and even addressed by sufficient exercise, supervision and consistent routines, but instead of organising our lives so that our pets may thrive, we show our love with a Halloween-themed cookie, or a new outfit for Instagram likes.
Its easy to forget that we are sharing our homes with a descendant of the wolf when it is dressed in sheeps clothing; but the more we learn about animals, the clearer it becomes that our treatment of them, simultaneously adoring and alienated, means they are leading strange, unsatisfying simulacra of the lives they ought to lead.
But for as long as we choose to share our lives with pets, the bar should be the same as for any relationship we value: being prepared to make sacrifices for their wellbeing, prioritising quality time and care, and loving them as they are not for how they reflect on us, or how wed like them to be.
"""
required_layers_one = await classify_into_categories(input_article_one, "classify_content.txt",
DefaultContentPrediction)
@ -112,4 +93,97 @@ async def cognify():
return graphs
# Run the async function for each set of cognitive layers
layer_1_graph = await async_graph_per_layer(input_article_one, cognitive_layers_one)
layer_1_graph = await async_graph_per_layer(input_article_one, cognitive_layers_one)
print(layer_1_graph)
graph_client = get_graph_client(GraphDBType.NETWORKX)
# Define a GraphModel instance with example data
graph_model_instance = DefaultGraphModel(
id="user123",
documents=[
Document(
doc_id="doc1",
title="Document 1",
summary="Summary of Document 1",
content_id="content_id_for_doc1",
doc_type=DocumentType(type_id="PDF", description="Portable Document Format"),
categories=[
Category(category_id="finance", name="Finance",
default_relationship=Relationship(type="belongs_to")),
Category(category_id="tech", name="Technology",
default_relationship=Relationship(type="belongs_to"))
],
default_relationship=Relationship(type='has_document')
),
Document(
doc_id="doc2",
title="Document 2",
summary="Summary of Document 2",
content_id="content_id_for_doc2",
doc_type=DocumentType(type_id="TXT", description="Text File"),
categories=[
Category(category_id="health", name="Health", default_relationship=Relationship(type="belongs_to")),
Category(category_id="wellness", name="Wellness",
default_relationship=Relationship(type="belongs_to"))
],
default_relationship=Relationship(type='has_document')
)
],
user_properties=UserProperties(
custom_properties={"age": "30"},
location=UserLocation(location_id="ny", description="New York",
default_relationship=Relationship(type='located_in'))
),
default_fields={
'created_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
'updated_at': datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
)
G = await create_semantic_graph(graph_model_instance, graph_client)
await add_classification_nodes(graph_client, 'Document:doc1', transformed_dict_1)
await append_to_graph(layer_1_graph, required_layers_one, graph_client)
def extract_node_descriptions(data):
descriptions = []
for node_id, attributes in data:
if 'description' in attributes and 'id' in attributes:
descriptions.append({'node_id': attributes['id'], 'description': attributes['description'],
'layer_uuid': attributes['layer_uuid'],
'layer_decomposition_uuid': attributes['layer_decomposition_uuid']})
return descriptions
# Extract the node descriptions
graph = await graph_client.graph()
node_descriptions = extract_node_descriptions(graph.nodes(data=True))
unique_layer_uuids = set(node['layer_decomposition_uuid'] for node in node_descriptions)
db = get_vector_database()
collection_config = CollectionConfig(
vector_config={
'content': models.VectorParams(
distance=models.Distance.COSINE,
size=3072
)
},
# Set other configs as needed
)
for layer in unique_layer_uuids:
await db.create_collection(layer,collection_config)
#to check if it works
await add_propositions(node_descriptions, db)
if __name__ == "__main__":
asyncio.run(cognify("The quick brown fox jumps over the lazy dog"))

View file

@ -1,10 +1,10 @@
from typing import List, Optional
from typing import List, Optional, Dict
from pydantic import BaseModel, Field
from qdrant_client import AsyncQdrantClient, models
from ..vector_db_interface import VectorDBInterface
class CollectionConfig(BaseModel, extra = "forbid"):
vector_config: models.VectorParams = Field(..., description="Vector configuration")
vector_config: Dict[str, models.VectorParams] = Field(..., description="Vectors configuration" )
hnsw_config: Optional[models.HnswConfig] = Field(default = None, description="HNSW vector index configuration")
optimizers_config: Optional[models.OptimizersConfig] = Field(default = None, description="Optimizers configuration")
quantization_config: Optional[models.QuantizationConfig] = Field(default = None, description="Quantization configuration")
@ -59,3 +59,50 @@ class QDrantAdapter(VectorDBInterface):
collection_name = collection_name,
points = data_points
)
async def batch_search(self, collection_name: str, embeddings: List[List[float]],
with_vectors: List[bool] = None):
"""
Perform batch search in a Qdrant collection with dynamic search requests.
Args:
- collection_name (str): Name of the collection to search in.
- embeddings (List[List[float]]): List of embeddings to search for.
- limits (List[int]): List of result limits for each search request.
- with_vectors (List[bool], optional): List indicating whether to return vectors for each search request.
Defaults to None, in which case vectors are not returned.
Returns:
- results: The search results from Qdrant.
"""
client = self.get_qdrant_client()
# Default with_vectors to False for each request if not provided
if with_vectors is None:
with_vectors = [False] * len(embeddings)
# Ensure with_vectors list matches the length of embeddings and limits
if len(with_vectors) != len(embeddings):
raise ValueError("The length of with_vectors must match the length of embeddings and limits")
# Generate dynamic search requests based on the provided embeddings
requests = [
models.SearchRequest(vector=models.NamedVector(
name="content",
vector=embedding,
),
# vector= embedding,
limit=3,
with_vector=False
) for embedding in embeddings
]
# Perform batch search with the dynamically generated requests
results = client.search_batch(
collection_name=collection_name,
requests=requests
)
return results

View file

@ -68,17 +68,18 @@ class VectorDBInterface(Protocol):
# data_point_id: str
# ): raise NotImplementedError
""" Search """
@abstractmethod
async def search(
self,
collection_name: str,
query: object
): raise NotImplementedError
# @abstractmethod
# async def search(
# self,
# collection_name: str,
# query: object
# ): raise NotImplementedError
@abstractmethod
async def batch_search(
self,
collection_name: str,
query: object
embeddings: List[List[float]],
with_vectors: List[bool] = None
): raise NotImplementedError

View file

@ -134,21 +134,24 @@ class OpenAIAdapter(LLMInterface):
return await openai.chat.completions.acreate(**kwargs)
@aretry_with_exponential_backoff
async def acreate_embedding_with_backoff(self,**kwargs):
async def acreate_embedding_with_backoff(self, input: List[str], model: str = "text-embedding-ada-002"):
"""Wrapper around Embedding.acreate w/ backoff"""
client = openai.AsyncOpenAI(
# This is the default and can be omitted
api_key=os.environ.get("OPENAI_API_KEY"),
)
# client = openai.AsyncOpenAI(
# # This is the default and can be omitted
# api_key=os.environ.get("OPENAI_API_KEY"),
# )
return await client.embeddings.create(**kwargs)
return await self.aclient.embeddings.create(input=input, model=model)
async def async_get_embedding_with_backoff(self, text, model="text-embedding-ada-002"):
"""To get text embeddings, import/call this function
It specifies defaults + handles rate-limiting + is async"""
text = text.replace("\n", " ")
response = await self.acreate_embedding_with_backoff(input=[text], model=model)
print(text)
print(model)
response = await self.aclient.embeddings.create(input =text, model= model)
# response = await self.acreate_embedding_with_backoff(input=text, model=model)
embedding = response.data[0].embedding
return embedding
@ -172,7 +175,7 @@ class OpenAIAdapter(LLMInterface):
"""To get multiple text embeddings in parallel, import/call this function
It specifies defaults + handles rate-limiting + is async"""
# Create a generator of coroutines
coroutines = (self.async_get_embedding_with_backoff(text, model)
coroutines = (await self.async_get_embedding_with_backoff(text, model)
for text, model in zip(texts, models))
# Run the coroutines in parallel and gather the results

View file

@ -1,5 +1,6 @@
""" Here we update semantic graph with content that classifier produced"""
import uuid
import json
from datetime import datetime
from enum import Enum, auto
from typing import Type, Optional, Any
@ -64,7 +65,35 @@ async def add_propositions(G, category_name, subclass_content, layer_description
return G
async def append_to_graph(layer_graphs, required_layers, G):
# Generate a UUID for the overall layer
layer_uuid = uuid.uuid4()
# Extract category name from required_layers data
category_name = required_layers.dict()['label']['type']
# Extract subgroup name from required_layers data
# Assuming there's always at least one subclass and we're taking the first
subgroup_name = required_layers.dict()['label']['subclass'][0].value # Access the value of the enum
for layer_ind in layer_graphs:
for layer_json, knowledge_graph in layer_ind.items():
# Decode the JSON key to get the layer description
layer_description = json.loads(layer_json)
# Generate a UUID for this particular layer decomposition
layer_decomposition_uuid = uuid.uuid4()
# Assuming append_data_to_graph is defined elsewhere and appends data to G
# You would pass relevant information from knowledge_graph along with other details to this function
F = await add_propositions(G, category_name, subgroup_name, layer_description, knowledge_graph,
layer_uuid, layer_decomposition_uuid)
# Print updated graph for verification (assuming F is the updated NetworkX Graph)
print("Updated Nodes:", F.graph.nodes(data=True))
return F
if __name__ == "__main__":
@ -113,39 +142,6 @@ if __name__ == "__main__":
)
# Run the async function for each set of cognitive layers
level_1_graph = asyncio.run( async_graph_per_layer(input_article_one, cognitive_layers_one))
import uuid
import json
async def append_to_graph(layer_graphs, required_layers, G):
# Generate a UUID for the overall layer
layer_uuid = uuid.uuid4()
# Extract category name from required_layers data
category_name = required_layers.dict()['label']['type']
# Extract subgroup name from required_layers data
# Assuming there's always at least one subclass and we're taking the first
subgroup_name = required_layers.dict()['label']['subclass'][0].value # Access the value of the enum
for layer_ind in layer_graphs:
for layer_json, knowledge_graph in layer_ind.items():
# Decode the JSON key to get the layer description
layer_description = json.loads(layer_json)
# Generate a UUID for this particular layer decomposition
layer_decomposition_uuid = uuid.uuid4()
# Assuming append_data_to_graph is defined elsewhere and appends data to G
# You would pass relevant information from knowledge_graph along with other details to this function
F = await add_propositions(G, category_name, subgroup_name, layer_description, knowledge_graph,
layer_uuid, layer_decomposition_uuid)
# Print updated graph for verification (assuming F is the updated NetworkX Graph)
print("Updated Nodes:", F.graph.nodes(data=True))
return F
G = asyncio.run(append_to_graph(level_1_graph, required_layers_one, graph_client))

View file

@ -0,0 +1,101 @@
async def process_items(grouped_data, unique_layer_uuids):
results_to_check = [] # This will hold results excluding self comparisons
tasks = [] # List to hold all tasks
task_to_info = {} # Dictionary to map tasks to their corresponding group id and item info
# Iterate through each group in grouped_data
for group_id, items in grouped_data.items():
# Filter unique_layer_uuids to exclude the current group_id
target_uuids = [uuid for uuid in unique_layer_uuids if uuid != group_id]
# Process each item in the group
for item in items:
# For each target UUID, create an async task for the item's embedding retrieval
for target_id in target_uuids:
task = asyncio.create_task \
(async_get_embedding_with_backoff(item['description'], "text-embedding-3-large"))
tasks.append(task)
# Map the task to the target id, item's node_id, and description for later retrieval
task_to_info[task] = (target_id, item['node_id'], group_id, item['description'])
# Await all tasks to complete and gather results
results = await asyncio.gather(*tasks)
# Process the results, associating them with their target id, node id, and description
for task, embedding in zip(tasks, results):
target_id, node_id ,group_id, description = task_to_info[task]
results_to_check.append([target_id, embedding, description, node_id, group_id])
return results_to_check
async def graph_ready_output(results):
relationship_dict ={}
for result_tuple in results:
uuid, scored_points_list, desc, node_id = result_tuple
# Unpack the tuple
# Ensure there's a list to collect related items for this uuid
if uuid not in relationship_dict:
relationship_dict[uuid] = []
for scored_points in scored_points_list: # Iterate over the list of ScoredPoint lists
for scored_point in scored_points: # Iterate over each ScoredPoint object
if scored_point.score > 0.9: # Check the score condition
# Append a new dictionary to the list associated with the uuid
relationship_dict[uuid].append({
'collection_name_uuid': uuid,
'searched_node_id': scored_point.id,
'score': scored_point.score,
'score_metadata': scored_point.payload,
'original_id_for_search': node_id,
})
return relationship_dict
async def connect_nodes_in_graph(graph, relationship_dict):
"""
For each relationship in relationship_dict, check if both nodes exist in the graph based on node attributes.
If they do, create a connection (edge) between them.
:param graph: A NetworkX graph object
:param relationship_dict: A dictionary containing relationships between nodes
"""
for id, relationships in relationship_dict.items():
for relationship in relationships:
searched_node_attr_id = relationship['searched_node_id']
print(searched_node_attr_id)
score_attr_id = relationship['original_id_for_search']
score = relationship['score']
# Initialize node keys for both searched_node and score_node
searched_node_key, score_node_key = None, None
# Find nodes in the graph that match the searched_node_id and score_id from their attributes
for node, attrs in graph.nodes(data=True):
if 'id' in attrs: # Ensure there is an 'id' attribute
if attrs['id'] == searched_node_attr_id:
searched_node_key = node
elif attrs['id'] == score_attr_id:
score_node_key = node
# If both nodes are found, no need to continue checking other nodes
if searched_node_key and score_node_key:
break
# Check if both nodes were found in the graph
if searched_node_key is not None and score_node_key is not None:
print(searched_node_key)
print(score_node_key)
# If both nodes exist, create an edge between them
# You can customize the edge attributes as needed, here we use 'score' as an attribute
graph.add_edge(searched_node_key, score_node_key, weight=score, score_metadata=relationship.get('score_metadata'))
return graph

View file

@ -1,7 +1,7 @@
""" This module is responsible for creating a semantic graph """
from datetime import datetime
from enum import Enum, auto
from typing import Type, Optional, Any
from typing import Optional, Any
from pydantic import BaseModel
from cognitive_architecture.infrastructure.databases.graph.get_graph_client import get_graph_client
from cognitive_architecture.shared.data_models import GraphDBType, DefaultGraphModel, Document, DocumentType, Category, Relationship, UserProperties, UserLocation

View file

@ -0,0 +1,29 @@
from cognitive_architecture.infrastructure.databases.vector.get_vector_database import get_vector_database
async def adapted_qdrant_batch_search(results_to_check, client):
search_results_list = []
for result in results_to_check:
id = result[0]
embedding = result[1]
node_id = result[2]
target = result[3]
b= result[4]
# Assuming each result in results_to_check contains a single embedding
limits = [3] * len(embedding) # Set a limit of 3 results for this embedding
try:
# Perform the batch search for this id with its embedding
# Assuming qdrant_batch_search function accepts a single embedding and a list of limits
id_search_results = await client.batch_search(id, [embedding], limits)
search_results_list.append((id, id_search_results, node_id, target))
except Exception as e:
print(f"Error during batch search for ID {id}: {e}")
continue
return search_results_list
client = get_vector_database()

View file

@ -0,0 +1,33 @@
import asyncio
from cognitive_architecture.infrastructure.llm.get_llm_client import get_llm_client
from qdrant_client import models
from cognitive_architecture.infrastructure.databases.vector.get_vector_database import get_vector_database
async def get_embeddings(texts):
client = get_llm_client()
tasks = [ client.async_get_embedding_with_backoff(text, "text-embedding-3-large") for text in texts]
results = await asyncio.gather(*tasks)
return results
async def upload_embedding(id, metadata, some_embeddings, collection_name, client):
print(id)
# if some_embeddings and isinstance(some_embeddings[0], list):
# some_embeddings = [item for sublist in some_embeddings for item in sublist]
client.upload_points(
collection_name=collection_name,
points=[
models.PointStruct(
id=id, vector={"content" :some_embeddings}, payload=metadata
)
]
,
)
async def add_propositions(node_descriptions, client):
for item in node_descriptions:
print(item['node_id'])
await upload_embedding(id = item['node_id'], metadata = {"meta":item['description']}, some_embeddings = get_embeddings(item['description']), collection_name= item['layer_decomposition_uuid'],client= client)

View file

@ -0,0 +1,13 @@
def fetch_context(CONNECTED_GRAPH, id):
relevant_context = []
for n,attr in CONNECTED_GRAPH.nodes(data=True):
if id in n:
for n_, attr_ in CONNECTED_GRAPH.nodes(data=True):
relevant_layer = attr['layer_uuid']
if attr_.get('layer_uuid') == relevant_layer:
print(attr_['description'])
relevant_context.append(attr_['description'])
return relevant_context

View file

@ -0,0 +1,15 @@
async def find_relevant_chunks(query ,unique_layer_uuids):
out = []
query = await get_embeddings(query)
# print(query)
for id in unique_layer_uuids:
result = qdrant_search(id, query[0])
if result:
result_ = [ result_.id for result_ in result]
score_ = [ result_.score for result_ in result]
out.append([result_, score_])
return out