Add utils for graph visualization + classification nodes
This commit is contained in:
parent
d69d42b451
commit
df6d1a0f07
9 changed files with 892 additions and 934 deletions
1580
Demo_graph.ipynb
1580
Demo_graph.ipynb
File diff suppressed because one or more lines are too long
|
|
@ -17,7 +17,7 @@ from cognitive_architecture.infrastructure.databases.vector.qdrant.adapter impor
|
|||
from cognitive_architecture.infrastructure.llm.get_llm_client import get_llm_client
|
||||
from cognitive_architecture.modules.cognify.graph.add_classification_nodes import add_classification_nodes
|
||||
from cognitive_architecture.modules.cognify.graph.add_node_connections import add_node_connection, graph_ready_output, \
|
||||
connect_nodes_in_graph
|
||||
connect_nodes_in_graph, extract_node_descriptions
|
||||
from cognitive_architecture.modules.cognify.graph.add_propositions import append_to_graph
|
||||
from cognitive_architecture.modules.cognify.llm.add_node_connection_embeddings import process_items
|
||||
from cognitive_architecture.modules.cognify.vector.batch_search import adapted_qdrant_batch_search
|
||||
|
|
@ -100,11 +100,14 @@ async def cognify(input_text:str):
|
|||
|
||||
# Run the async function for each set of cognitive layers
|
||||
layer_1_graph = await async_graph_per_layer(input_article_one, cognitive_layers_one)
|
||||
print(layer_1_graph)
|
||||
|
||||
|
||||
|
||||
# print(layer_1_graph)
|
||||
#
|
||||
#
|
||||
#
|
||||
graph_client = get_graph_client(GraphDBType.NETWORKX)
|
||||
#
|
||||
# ADD SUMMARY
|
||||
# ADD CATEGORIES
|
||||
|
||||
# Define a GraphModel instance with example data
|
||||
graph_model_instance = DefaultGraphModel(
|
||||
|
|
@ -154,23 +157,15 @@ async def cognify(input_text:str):
|
|||
|
||||
F, unique_layer_uuids = await append_to_graph(layer_1_graph, required_layers_one, graph_client)
|
||||
|
||||
def extract_node_descriptions(data):
|
||||
descriptions = []
|
||||
for node_id, attributes in data:
|
||||
if 'description' in attributes and 'id' in attributes:
|
||||
descriptions.append({'node_id': attributes['id'], 'description': attributes['description'],
|
||||
'layer_uuid': attributes['layer_uuid'],
|
||||
'layer_decomposition_uuid': attributes['layer_decomposition_uuid']})
|
||||
return descriptions
|
||||
|
||||
# Extract the node descriptions
|
||||
# # Extract the node descriptions
|
||||
await graph_client.load_graph_from_file()
|
||||
graph = graph_client.graph
|
||||
node_descriptions = extract_node_descriptions(graph.nodes(data=True))
|
||||
# unique_layer_uuids = set(node['layer_decomposition_uuid'] for node in node_descriptions)
|
||||
#
|
||||
node_descriptions = await extract_node_descriptions(graph.nodes(data=True))
|
||||
unique_layer_uuids = set(node['layer_decomposition_uuid'] for node in node_descriptions)
|
||||
# #
|
||||
db = get_vector_database()
|
||||
|
||||
|
||||
#
|
||||
#
|
||||
collection_config = CollectionConfig(
|
||||
vector_config={
|
||||
'content': models.VectorParams(
|
||||
|
|
@ -181,27 +176,49 @@ async def cognify(input_text:str):
|
|||
# Set other configs as needed
|
||||
)
|
||||
|
||||
for layer in unique_layer_uuids:
|
||||
await db.create_collection(layer,collection_config)
|
||||
from qdrant_client import QdrantClient
|
||||
try:
|
||||
for layer in unique_layer_uuids:
|
||||
await db.create_collection(layer,collection_config)
|
||||
except:
|
||||
pass
|
||||
|
||||
# #to check if it works
|
||||
# qdrant = QdrantClient(
|
||||
# url=os.getenv('QDRANT_URL'),
|
||||
# api_key=os.getenv('QDRANT_API_KEY'))
|
||||
#
|
||||
await add_propositions(node_descriptions, db)
|
||||
# collections_response = qdrant.http.collections_api.get_collections()
|
||||
# collections = collections_response.result.collections
|
||||
# print(collections)
|
||||
|
||||
|
||||
# print(node_descriptions)
|
||||
#
|
||||
await add_propositions(node_descriptions)
|
||||
|
||||
from cognitive_architecture.infrastructure.databases.vector.qdrant.adapter import AsyncQdrantClient
|
||||
|
||||
grouped_data = await add_node_connection(graph_client, db, node_descriptions)
|
||||
|
||||
# print("we are here, grouped_data", grouped_data)
|
||||
|
||||
llm_client = get_llm_client()
|
||||
|
||||
|
||||
|
||||
relationship_dict = await process_items(grouped_data, unique_layer_uuids, llm_client)
|
||||
|
||||
|
||||
# print("we are here", relationship_dict[0])
|
||||
|
||||
results = await adapted_qdrant_batch_search(relationship_dict, db)
|
||||
# print(results)
|
||||
relationship_d = graph_ready_output(results)
|
||||
# print(relationship_d)
|
||||
|
||||
CONNECTED_GRAPH = connect_nodes_in_graph(F, relationship_d)
|
||||
|
||||
out = await render_graph(CONNECTED_GRAPH, graph_type='networkx')
|
||||
out = await render_graph(CONNECTED_GRAPH.graph, graph_type='networkx')
|
||||
print(out)
|
||||
return CONNECTED_GRAPH
|
||||
|
||||
|
|
@ -228,4 +245,8 @@ async def cognify(input_text:str):
|
|||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(cognify("The quick brown fox jumps over the lazy dog"))
|
||||
asyncio.run(cognify("""In the nicest possible way, Britons have always been a bit silly about animals. “Keeping pets, for the English, is not so much a leisure activity as it is an entire way of life,” wrote the anthropologist Kate Fox in Watching the English, nearly 20 years ago. Our dogs, in particular, have been an acceptable outlet for emotions and impulses we otherwise keep strictly controlled – our latent desire to be demonstratively affectionate, to be silly and chat to strangers. If this seems like an exaggeration, consider the different reactions you’d get if you struck up a conversation with someone in a park with a dog, versus someone on the train.
|
||||
Indeed, British society has been set up to accommodate these four-legged ambassadors. In the UK – unlike Australia, say, or New Zealand – dogs are not just permitted on public transport but often openly encouraged. Many pubs and shops display waggish signs, reading, “Dogs welcome, people tolerated”, and have treat jars on their counters. The other day, as I was waiting outside a cafe with a friend’s dog, the barista urged me to bring her inside.
|
||||
For years, Britons’ non-partisan passion for animals has been consistent amid dwindling common ground. But lately, rather than bringing out the best in us, our relationship with dogs is increasingly revealing us at our worst – and our supposed “best friends” are paying the price.
|
||||
As with so many latent traits in the national psyche, it all came unleashed with the pandemic, when many people thought they might as well make the most of all that time at home and in local parks with a dog. Between 2019 and 2022, the number of pet dogs in the UK rose from about nine million to 13 million. But there’s long been a seasonal surge around this time of year, substantial enough for the Dogs Trust charity to coin its famous slogan back in 1978: “A dog is for life, not just for Christmas.”
|
||||
"""))
|
||||
|
|
@ -60,6 +60,16 @@ class QDrantAdapter(VectorDBInterface):
|
|||
points = data_points
|
||||
)
|
||||
|
||||
async def search(self, collection_name: str, query_vector: List[float], limit: int, with_vector: bool = False):
|
||||
client = self.get_qdrant_client()
|
||||
|
||||
return await client.search(
|
||||
collection_name = collection_name,
|
||||
query_vector = query_vector,
|
||||
limit = limit,
|
||||
with_vectors = with_vector
|
||||
)
|
||||
|
||||
|
||||
async def batch_search(self, collection_name: str, embeddings: List[List[float]],
|
||||
with_vectors: List[bool] = None):
|
||||
|
|
|
|||
|
|
@ -68,12 +68,15 @@ class VectorDBInterface(Protocol):
|
|||
# data_point_id: str
|
||||
# ): raise NotImplementedError
|
||||
""" Search """
|
||||
# @abstractmethod
|
||||
# async def search(
|
||||
# self,
|
||||
# collection_name: str,
|
||||
# query: object
|
||||
# ): raise NotImplementedError
|
||||
@abstractmethod
|
||||
async def search(
|
||||
self,
|
||||
collection_name: str,
|
||||
query_vector: List[float],
|
||||
limit: int,
|
||||
with_vector: bool = False
|
||||
|
||||
): raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
async def batch_search(
|
||||
|
|
|
|||
|
|
@ -2,11 +2,11 @@ from cognitive_architecture.infrastructure.databases.graph.get_graph_client impo
|
|||
from cognitive_architecture.shared.data_models import GraphDBType
|
||||
|
||||
|
||||
def extract_node_descriptions(data):
|
||||
async def extract_node_descriptions(data):
|
||||
descriptions = []
|
||||
for node_id, attributes in data:
|
||||
if 'description' in attributes and 'id' in attributes:
|
||||
descriptions.append({'node_id': attributes['id'], 'description': attributes['description'], 'layer_uuid': attributes['layer_uuid'], 'layer_decomposition_uuid': attributes['layer_decomposition_uuid'] })
|
||||
if 'description' in attributes and 'unique_id' in attributes:
|
||||
descriptions.append({'node_id': attributes['unique_id'], 'description': attributes['description'], 'layer_uuid': attributes['layer_uuid'], 'layer_decomposition_uuid': attributes['layer_decomposition_uuid'] })
|
||||
return descriptions
|
||||
|
||||
|
||||
|
|
@ -15,7 +15,7 @@ def extract_node_descriptions(data):
|
|||
async def add_node_connection(graph_client, vector_database_client, data):
|
||||
|
||||
graph = graph_client.graph
|
||||
node_descriptions = extract_node_descriptions(graph.nodes(data=True))
|
||||
node_descriptions = data
|
||||
|
||||
|
||||
grouped_data = {}
|
||||
|
|
@ -56,10 +56,10 @@ def connect_nodes_in_graph(graph, relationship_dict):
|
|||
|
||||
# Find nodes in the graph that match the searched_node_id and score_id from their attributes
|
||||
for node, attrs in graph.nodes(data=True):
|
||||
if 'id' in attrs: # Ensure there is an 'id' attribute
|
||||
if attrs['id'] == searched_node_attr_id:
|
||||
if 'unique_id' in attrs: # Ensure there is an 'id' attribute
|
||||
if attrs['unique_id'] == searched_node_attr_id:
|
||||
searched_node_key = node
|
||||
elif attrs['id'] == score_attr_id:
|
||||
elif attrs['unique_id'] == score_attr_id:
|
||||
score_node_key = node
|
||||
|
||||
# If both nodes are found, no need to continue checking other nodes
|
||||
|
|
|
|||
|
|
@ -1,101 +0,0 @@
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
async def process_items(grouped_data, unique_layer_uuids):
|
||||
results_to_check = [] # This will hold results excluding self comparisons
|
||||
tasks = [] # List to hold all tasks
|
||||
task_to_info = {} # Dictionary to map tasks to their corresponding group id and item info
|
||||
|
||||
# Iterate through each group in grouped_data
|
||||
for group_id, items in grouped_data.items():
|
||||
# Filter unique_layer_uuids to exclude the current group_id
|
||||
target_uuids = [uuid for uuid in unique_layer_uuids if uuid != group_id]
|
||||
|
||||
# Process each item in the group
|
||||
for item in items:
|
||||
# For each target UUID, create an async task for the item's embedding retrieval
|
||||
for target_id in target_uuids:
|
||||
task = asyncio.create_task \
|
||||
(async_get_embedding_with_backoff(item['description'], "text-embedding-3-large"))
|
||||
tasks.append(task)
|
||||
# Map the task to the target id, item's node_id, and description for later retrieval
|
||||
task_to_info[task] = (target_id, item['node_id'], group_id, item['description'])
|
||||
|
||||
# Await all tasks to complete and gather results
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
# Process the results, associating them with their target id, node id, and description
|
||||
for task, embedding in zip(tasks, results):
|
||||
|
||||
target_id, node_id ,group_id, description = task_to_info[task]
|
||||
results_to_check.append([target_id, embedding, description, node_id, group_id])
|
||||
|
||||
return results_to_check
|
||||
|
||||
async def graph_ready_output(results):
|
||||
relationship_dict ={}
|
||||
|
||||
for result_tuple in results:
|
||||
|
||||
uuid, scored_points_list, desc, node_id = result_tuple
|
||||
# Unpack the tuple
|
||||
|
||||
# Ensure there's a list to collect related items for this uuid
|
||||
if uuid not in relationship_dict:
|
||||
relationship_dict[uuid] = []
|
||||
|
||||
for scored_points in scored_points_list: # Iterate over the list of ScoredPoint lists
|
||||
for scored_point in scored_points: # Iterate over each ScoredPoint object
|
||||
if scored_point.score > 0.9: # Check the score condition
|
||||
# Append a new dictionary to the list associated with the uuid
|
||||
relationship_dict[uuid].append({
|
||||
'collection_name_uuid': uuid,
|
||||
'searched_node_id': scored_point.id,
|
||||
'score': scored_point.score,
|
||||
'score_metadata': scored_point.payload,
|
||||
'original_id_for_search': node_id,
|
||||
})
|
||||
return relationship_dict
|
||||
|
||||
async def connect_nodes_in_graph(graph, relationship_dict):
|
||||
"""
|
||||
For each relationship in relationship_dict, check if both nodes exist in the graph based on node attributes.
|
||||
If they do, create a connection (edge) between them.
|
||||
|
||||
:param graph: A NetworkX graph object
|
||||
:param relationship_dict: A dictionary containing relationships between nodes
|
||||
"""
|
||||
for id, relationships in relationship_dict.items():
|
||||
for relationship in relationships:
|
||||
searched_node_attr_id = relationship['searched_node_id']
|
||||
print(searched_node_attr_id)
|
||||
score_attr_id = relationship['original_id_for_search']
|
||||
score = relationship['score']
|
||||
|
||||
|
||||
# Initialize node keys for both searched_node and score_node
|
||||
searched_node_key, score_node_key = None, None
|
||||
|
||||
# Find nodes in the graph that match the searched_node_id and score_id from their attributes
|
||||
for node, attrs in graph.nodes(data=True):
|
||||
if 'id' in attrs: # Ensure there is an 'id' attribute
|
||||
if attrs['id'] == searched_node_attr_id:
|
||||
searched_node_key = node
|
||||
elif attrs['id'] == score_attr_id:
|
||||
score_node_key = node
|
||||
|
||||
# If both nodes are found, no need to continue checking other nodes
|
||||
if searched_node_key and score_node_key:
|
||||
break
|
||||
|
||||
# Check if both nodes were found in the graph
|
||||
if searched_node_key is not None and score_node_key is not None:
|
||||
print(searched_node_key)
|
||||
print(score_node_key)
|
||||
# If both nodes exist, create an edge between them
|
||||
# You can customize the edge attributes as needed, here we use 'score' as an attribute
|
||||
graph.add_edge(searched_node_key, score_node_key, weight=score, score_metadata=relationship.get('score_metadata'))
|
||||
|
||||
return graph
|
||||
|
|
@ -15,8 +15,7 @@ async def process_items(grouped_data, unique_layer_uuids, llm_client):
|
|||
for item in items:
|
||||
# For each target UUID, create an async task for the item's embedding retrieval
|
||||
for target_id in target_uuids:
|
||||
task = asyncio.create_task(
|
||||
llm_client.async_get_embedding_with_backoff(item['description'], "text-embedding-3-large"))
|
||||
task = asyncio.create_task(llm_client.async_get_embedding_with_backoff(item['description'], "text-embedding-3-large"))
|
||||
tasks.append(task)
|
||||
# Map the task to the target id, item's node_id, and description for later retrieval
|
||||
task_to_info[task] = (target_id, item['node_id'], group_id, item['description'])
|
||||
|
|
@ -24,6 +23,7 @@ async def process_items(grouped_data, unique_layer_uuids, llm_client):
|
|||
# Await all tasks to complete and gather results
|
||||
results = await asyncio.gather(*tasks)
|
||||
|
||||
|
||||
# Process the results, associating them with their target id, node id, and description
|
||||
for task, embedding in zip(tasks, results):
|
||||
target_id, node_id, group_id, description = task_to_info[task]
|
||||
|
|
|
|||
|
|
@ -29,4 +29,6 @@ async def adapted_qdrant_batch_search(results_to_check,vector_client):
|
|||
|
||||
if __name__ == '__main__':
|
||||
|
||||
client = get_vector_database()
|
||||
client = get_vector_database()
|
||||
|
||||
adapted_qdrant_batch_search()
|
||||
|
|
@ -4,19 +4,19 @@ from cognitive_architecture.infrastructure.llm.get_llm_client import get_llm_cli
|
|||
from qdrant_client import models
|
||||
from cognitive_architecture.infrastructure.databases.vector.get_vector_database import get_vector_database
|
||||
|
||||
async def get_embeddings(texts):
|
||||
async def get_embeddings(texts:list):
|
||||
""" Get embeddings for a list of texts"""
|
||||
client = get_llm_client()
|
||||
tasks = [ client.async_get_embedding_with_backoff(text, "text-embedding-3-large") for text in texts]
|
||||
results = await asyncio.gather(*tasks)
|
||||
return results
|
||||
async def upload_embedding(id, metadata, some_embeddings, collection_name, client):
|
||||
print(id)
|
||||
# if some_embeddings and isinstance(some_embeddings[0], list):
|
||||
# some_embeddings = [item for sublist in some_embeddings for item in sublist]
|
||||
|
||||
client.upload_points(
|
||||
async def upload_embedding(id, metadata, some_embeddings, collection_name):
|
||||
""" Upload a single embedding to a collection in Qdrant."""
|
||||
client = get_vector_database()
|
||||
# print("Uploading embeddings")
|
||||
await client.create_data_points(
|
||||
collection_name=collection_name,
|
||||
points=[
|
||||
data_points=[
|
||||
models.PointStruct(
|
||||
id=id, vector={"content" :some_embeddings}, payload=metadata
|
||||
)
|
||||
|
|
@ -25,10 +25,11 @@ async def upload_embedding(id, metadata, some_embeddings, collection_name, clien
|
|||
)
|
||||
|
||||
|
||||
async def add_propositions(node_descriptions, client):
|
||||
async def add_propositions(node_descriptions):
|
||||
for item in node_descriptions:
|
||||
print(item['node_id'])
|
||||
embeddings = await get_embeddings(item['description'])
|
||||
await upload_embedding(id = item['node_id'], metadata = {"meta":item['description']}, some_embeddings = embeddings[0], collection_name= item['layer_decomposition_uuid'],client= client)
|
||||
embeddings = await get_embeddings([item['description']])
|
||||
await upload_embedding(id = item['node_id'], metadata = {"meta":item['description']},
|
||||
some_embeddings = embeddings[0],
|
||||
collection_name= item['layer_decomposition_uuid'])
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Add table
Reference in a new issue