feat: memify next iteration

This commit is contained in:
Igor Ilic 2025-09-03 16:08:32 +02:00
parent 1a2977779f
commit 2847569616
12 changed files with 235 additions and 144 deletions

View file

@ -1,4 +1,5 @@
from typing import Union, Optional, List, Type
from typing import Union, Optional, List, Type, Any
from dataclasses import field
from uuid import UUID
from cognee.shared.logging_utils import get_logger
@ -16,15 +17,16 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
reset_dataset_pipeline_run_status,
)
from cognee.modules.engine.operations.setup import setup
from cognee.tasks.memify.extract_subgraph import extract_subgraph
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
logger = get_logger("memify")
async def memify(
tasks: List[Task],
preprocessing_tasks: List[Task],
processing_tasks: List[Task] = [],
postprocessing_tasks: List[Task] = [],
data: Optional[Any] = None,
datasets: Union[str, list[str], list[UUID]] = None,
user: User = None,
node_type: Optional[Type] = NodeSet,
@ -55,16 +57,18 @@ async def memify(
Use pipeline_run_id from return value to monitor progress.
"""
if cypher_query:
pass
else:
memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name)
# List of edges should be a single element in the list to represent one data item
data = [memory_fragment.edges]
if not data:
if cypher_query:
pass
else:
memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name)
# Subgraphs should be a single element in the list to represent one data item
data = [memory_fragment]
memify_tasks = [
Task(extract_subgraph),
*tasks, # Unpack tasks provided to memify pipeline
*preprocessing_tasks, # Unpack tasks provided to memify pipeline
*processing_tasks,
*postprocessing_tasks,
]
await setup()

View file

@ -1,6 +1,6 @@
You are an association agent tasked with suggesting structured developer rules from user-agent interactions stored in a Knowledge Graph.
You will receive the actual user agent interaction as a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet, and the list of the already existing developer rules.
Each rule represents a single best practice or guideline the agent should follow in the future.
Suggest rules that are general and not specific to the current text, strictly technical, add value and improve the future agent behavior.
Suggest rules that are general and not specific to the knowledge graph relationships, strictly technical, add value and improve the future agent behavior.
Do not suggest rules similar to the existing ones or rules that are not general and dont add value.
It is acceptable to return an empty rule list.

View file

@ -188,72 +188,3 @@ class CogneeGraph(CogneeAbstractGraph):
return n1 + n2 + e
return heapq.nsmallest(k, self.edges, key=score)
@staticmethod
async def resolve_edges_to_text(retrieved_edges: list) -> str:
"""
Converts retrieved graph edges into a human-readable string format.
Parameters:
-----------
- retrieved_edges (list): A list of edges retrieved from the graph.
Returns:
--------
- str: A formatted string representation of the nodes and their connections.
"""
def _get_nodes(retrieved_edges: list) -> dict:
def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str:
def _top_n_words(text, stop_words=None, top_n=3, separator=", "):
"""Concatenates the top N frequent words in text."""
if stop_words is None:
from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS
stop_words = DEFAULT_STOP_WORDS
import string
words = [word.lower().strip(string.punctuation) for word in text.split()]
if stop_words:
words = [word for word in words if word and word not in stop_words]
from collections import Counter
top_words = [word for word, freq in Counter(words).most_common(top_n)]
return separator.join(top_words)
"""Creates a title, by combining first words with most frequent words from the text."""
first_n_words = text.split()[:first_n_words]
top_n_words = _top_n_words(text, top_n=top_n_words)
return f"{' '.join(first_n_words)}... [{top_n_words}]"
"""Creates a dictionary of nodes with their names and content."""
nodes = {}
for edge in retrieved_edges:
for node in (edge.node1, edge.node2):
if node.id not in nodes:
text = node.attributes.get("text")
if text:
name = _get_title(text)
content = text
else:
name = node.attributes.get("name", "Unnamed Node")
content = node.attributes.get("description", name)
nodes[node.id] = {"node": node, "name": name, "content": content}
return nodes
nodes = _get_nodes(retrieved_edges)
node_section = "\n".join(
f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n"
for info in nodes.values()
)
connection_section = "\n".join(
f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}"
for edge in retrieved_edges
)
return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}"

View file

@ -4,3 +4,4 @@ from .get_model_instance_from_graph import get_model_instance_from_graph
from .retrieve_existing_edges import retrieve_existing_edges
from .convert_node_to_data_point import convert_node_to_data_point
from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges
from .resolve_edges_to_text import resolve_edges_to_text

View file

@ -0,0 +1,67 @@
async def resolve_edges_to_text(retrieved_edges: list) -> str:
"""
Converts retrieved graph edges into a human-readable string format.
Parameters:
-----------
- retrieved_edges (list): A list of edges retrieved from the graph.
Returns:
--------
- str: A formatted string representation of the nodes and their connections.
"""
def _get_nodes(retrieved_edges: list) -> dict:
def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str:
def _top_n_words(text, stop_words=None, top_n=3, separator=", "):
"""Concatenates the top N frequent words in text."""
if stop_words is None:
from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS
stop_words = DEFAULT_STOP_WORDS
import string
words = [word.lower().strip(string.punctuation) for word in text.split()]
if stop_words:
words = [word for word in words if word and word not in stop_words]
from collections import Counter
top_words = [word for word, freq in Counter(words).most_common(top_n)]
return separator.join(top_words)
"""Creates a title, by combining first words with most frequent words from the text."""
first_n_words = text.split()[:first_n_words]
top_n_words = _top_n_words(text, top_n=top_n_words)
return f"{' '.join(first_n_words)}... [{top_n_words}]"
"""Creates a dictionary of nodes with their names and content."""
nodes = {}
for edge in retrieved_edges:
for node in (edge.node1, edge.node2):
if node.id not in nodes:
text = node.attributes.get("text")
if text:
name = _get_title(text)
content = text
else:
name = node.attributes.get("name", "Unnamed Node")
content = node.attributes.get("description", name)
nodes[node.id] = {"node": node, "name": name, "content": content}
return nodes
nodes = _get_nodes(retrieved_edges)
node_section = "\n".join(
f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n"
for info in nodes.values()
)
connection_section = "\n".join(
f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}"
for edge in retrieved_edges
)
return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}"

View file

@ -5,6 +5,7 @@ import string
from cognee.infrastructure.engine import DataPoint
from cognee.tasks.storage import add_data_points
from cognee.modules.graph.utils import resolve_edges_to_text
from cognee.modules.graph.utils.convert_node_to_data_point import get_all_subclasses
from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.utils.brute_force_triplet_search import brute_force_triplet_search
@ -53,22 +54,6 @@ class GraphCompletionRetriever(BaseRetriever):
self.node_type = node_type
self.node_name = node_name
def _get_nodes(self, retrieved_edges: list) -> dict:
"""Creates a dictionary of nodes with their names and content."""
nodes = {}
for edge in retrieved_edges:
for node in (edge.node1, edge.node2):
if node.id not in nodes:
text = node.attributes.get("text")
if text:
name = self._get_title(text)
content = text
else:
name = node.attributes.get("name", "Unnamed Node")
content = node.attributes.get("description", name)
nodes[node.id] = {"node": node, "name": name, "content": content}
return nodes
async def resolve_edges_to_text(self, retrieved_edges: list) -> str:
"""
Converts retrieved graph edges into a human-readable string format.
@ -83,16 +68,7 @@ class GraphCompletionRetriever(BaseRetriever):
- str: A formatted string representation of the nodes and their connections.
"""
nodes = self._get_nodes(retrieved_edges)
node_section = "\n".join(
f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n"
for info in nodes.values()
)
connection_section = "\n".join(
f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}"
for edge in retrieved_edges
)
return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}"
return await resolve_edges_to_text(retrieved_edges)
async def get_triplets(self, query: str) -> list:
"""
@ -196,26 +172,6 @@ class GraphCompletionRetriever(BaseRetriever):
return [completion]
def _top_n_words(self, text, stop_words=None, top_n=3, separator=", "):
"""Concatenates the top N frequent words in text."""
if stop_words is None:
stop_words = DEFAULT_STOP_WORDS
words = [word.lower().strip(string.punctuation) for word in text.split()]
if stop_words:
words = [word for word in words if word and word not in stop_words]
top_words = [word for word, freq in Counter(words).most_common(top_n)]
return separator.join(top_words)
def _get_title(self, text: str, first_n_words: int = 7, top_n_words: int = 3) -> str:
"""Creates a title, by combining first words with most frequent words from the text."""
first_n_words = text.split()[:first_n_words]
top_n_words = self._top_n_words(text, top_n=top_n_words)
return f"{' '.join(first_n_words)}... [{top_n_words}]"
async def save_qa(self, question: str, answer: str, context: str, triplets: List) -> None:
"""
Saves a question and answer pair for later analysis or storage.

View file

@ -96,10 +96,14 @@ async def add_rule_associations(
user_prompt_location: str = "coding_rule_association_agent_user.txt",
system_prompt_location: str = "coding_rule_association_agent_system.txt",
):
if isinstance(data, list):
# If data is a list of strings join all strings in list
data = " ".join(data)
graph_engine = await get_graph_engine()
existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name)
user_context = {"user data": data, "rules": existing_rules}
user_context = {"chat": data, "rules": existing_rules}
user_prompt = LLMGateway.render_prompt(user_prompt_location, context=user_context)
system_prompt = LLMGateway.render_prompt(system_prompt_location, context={})

View file

@ -1 +1,2 @@
from .extract_subgraph import extract_subgraph
from .extract_subgraph_chunks import extract_subgraph_chunks

View file

@ -1,7 +1,7 @@
from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
async def extract_subgraph(subgraphs):
async def extract_subgraph(subgraphs: list[CogneeGraph]):
for subgraph in subgraphs:
for edge in subgraph:
for edge in subgraph.edges:
yield edge

View file

@ -0,0 +1,11 @@
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
async def extract_subgraph_chunks(subgraphs: list[CogneeGraph]):
"""
Get all Document Chunks from subgraphs and forward to next task in pipeline
"""
for subgraph in subgraphs:
for node in subgraph.nodes.values():
if node.attributes["type"] == "DocumentChunk":
yield node.attributes["text"]

View file

@ -1,7 +1,18 @@
import asyncio
import pathlib
import os
import cognee
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.shared.logging_utils import setup_logging, ERROR
from cognee.api.v1.search import SearchType
from cognee.api.v1.cognify.memify import memify
from cognee.modules.pipelines.tasks.task import Task
from cognee.tasks.memify.extract_subgraph import extract_subgraph
from cognee.modules.graph.utils import resolve_edges_to_text
from cognee.tasks.codingagents.coding_rule_associations import (
add_rule_associations,
get_existing_rules,
)
# Prerequisites:
# 1. Copy `.env.template` and rename it to `.env`.
@ -38,14 +49,10 @@ async def main():
await cognee.cognify()
print("Cognify process complete.\n")
from cognee.api.v1.cognify.memify import memify
subgraph_extraction_tasks = [Task(extract_subgraph)]
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations
from cognee.modules.pipelines.tasks.task import Task
memify_tasks = [
Task(CogneeGraph.resolve_edges_to_text, task_config={"batch_size": 10}),
rule_association_tasks = [
Task(resolve_edges_to_text, task_config={"batch_size": 10}),
Task(
add_rule_associations,
rules_nodeset_name="coding_agent_rules",
@ -54,11 +61,14 @@ async def main():
),
]
await memify(tasks=memify_tasks, node_name=["coding_rules"])
await memify(
preprocessing_tasks=subgraph_extraction_tasks,
processing_tasks=rule_association_tasks,
node_name=["coding_rules"],
)
import os
import pathlib
from cognee.api.v1.visualize.visualize import visualize_graph
developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules")
print(developer_rules)
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html"

View file

@ -0,0 +1,106 @@
import asyncio
import pathlib
import os
import cognee
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.shared.logging_utils import setup_logging, ERROR
from cognee.api.v1.cognify.memify import memify
from cognee.modules.pipelines.tasks.task import Task
from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks
from cognee.tasks.codingagents.coding_rule_associations import (
add_rule_associations,
get_existing_rules,
)
# Prerequisites:
# 1. Copy `.env.template` and rename it to `.env`.
# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field:
# LLM_API_KEY = "your_key_here"
async def main():
# Create a clean slate for cognee -- reset data and system state
print("Resetting cognee data...")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
print("Data reset complete.\n")
print("Adding conversation about rules to cognee:\n")
coding_rules_chat_from_principal_engineer = """
We want code to be formatted by PEP8 standards.
Typing and Docstrings must be added.
Please also make sure to write NOTE: on all more complex code segments.
If there is any duplicate code, try to handle it in one function to avoid code duplication.
Susan should also always review new code changes before merging to main.
New releases should not happen on Friday so we don't have to fix them during the weekend.
"""
print(
f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}"
)
coding_rules_chat_from_manager = """
Susan should always review new code changes before merging to main.
New releases should not happen on Friday so we don't have to fix them during the weekend.
"""
print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}")
# Add the text, and make it available for cognify
await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager])
print("Text added successfully.\n")
# Use LLMs and cognee to create knowledge graph
await cognee.cognify()
print("Cognify process complete.\n")
# Visualize graph after cognification
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization only after cognification: {file_path}")
# After graph is created, create a second pipeline that will go through the graph and enchance it with specific
# coding rule nodes
# extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify)
subgraph_extraction_tasks = [Task(extract_subgraph_chunks)]
# add_rule_associations is a function that handles processing coding rules from chunks and keeps track of
# existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created
# in the graph that specify coding rules found in conversations.
coding_rules_association_tasks = [
Task(
add_rule_associations,
rules_nodeset_name="coding_agent_rules",
task_config={"batch_size": 1},
),
]
# Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified).
# If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead
await memify(
preprocessing_tasks=subgraph_extraction_tasks,
processing_tasks=coding_rules_association_tasks,
)
# Find the new specific coding rules added to graph through memify (created based on chat conversation between team members)
developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules")
print(developer_rules)
# Visualize new graph with added memify context
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization after memify enhancment: {file_path}")
if __name__ == "__main__":
logger = setup_logging(log_level=ERROR)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())