diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py index df45bac76..8237059ec 100644 --- a/cognee/api/v1/cognify/memify.py +++ b/cognee/api/v1/cognify/memify.py @@ -1,4 +1,5 @@ -from typing import Union, Optional, List, Type +from typing import Union, Optional, List, Type, Any +from dataclasses import field from uuid import UUID from cognee.shared.logging_utils import get_logger @@ -16,15 +17,16 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( reset_dataset_pipeline_run_status, ) from cognee.modules.engine.operations.setup import setup - -from cognee.tasks.memify.extract_subgraph import extract_subgraph from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor logger = get_logger("memify") async def memify( - tasks: List[Task], + preprocessing_tasks: List[Task], + processing_tasks: List[Task] = [], + postprocessing_tasks: List[Task] = [], + data: Optional[Any] = None, datasets: Union[str, list[str], list[UUID]] = None, user: User = None, node_type: Optional[Type] = NodeSet, @@ -55,16 +57,18 @@ async def memify( Use pipeline_run_id from return value to monitor progress. """ - if cypher_query: - pass - else: - memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) - # List of edges should be a single element in the list to represent one data item - data = [memory_fragment.edges] + if not data: + if cypher_query: + pass + else: + memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) + # Subgraphs should be a single element in the list to represent one data item + data = [memory_fragment] memify_tasks = [ - Task(extract_subgraph), - *tasks, # Unpack tasks provided to memify pipeline + *preprocessing_tasks, # Unpack tasks provided to memify pipeline + *processing_tasks, + *postprocessing_tasks, ] await setup() diff --git a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt index 31c9825bd..d9adf45f7 100644 --- a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt +++ b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt @@ -1,6 +1,6 @@ You are an association agent tasked with suggesting structured developer rules from user-agent interactions stored in a Knowledge Graph. You will receive the actual user agent interaction as a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet, and the list of the already existing developer rules. Each rule represents a single best practice or guideline the agent should follow in the future. -Suggest rules that are general and not specific to the current text, strictly technical, add value and improve the future agent behavior. +Suggest rules that are general and not specific to the knowledge graph relationships, strictly technical, add value and improve the future agent behavior. Do not suggest rules similar to the existing ones or rules that are not general and dont add value. It is acceptable to return an empty rule list. diff --git a/cognee/modules/graph/cognee_graph/CogneeGraph.py b/cognee/modules/graph/cognee_graph/CogneeGraph.py index 94a8e965e..924532ce0 100644 --- a/cognee/modules/graph/cognee_graph/CogneeGraph.py +++ b/cognee/modules/graph/cognee_graph/CogneeGraph.py @@ -188,72 +188,3 @@ class CogneeGraph(CogneeAbstractGraph): return n1 + n2 + e return heapq.nsmallest(k, self.edges, key=score) - - @staticmethod - async def resolve_edges_to_text(retrieved_edges: list) -> str: - """ - Converts retrieved graph edges into a human-readable string format. - - Parameters: - ----------- - - - retrieved_edges (list): A list of edges retrieved from the graph. - - Returns: - -------- - - - str: A formatted string representation of the nodes and their connections. - """ - - def _get_nodes(retrieved_edges: list) -> dict: - def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: - def _top_n_words(text, stop_words=None, top_n=3, separator=", "): - """Concatenates the top N frequent words in text.""" - if stop_words is None: - from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS - - stop_words = DEFAULT_STOP_WORDS - - import string - - words = [word.lower().strip(string.punctuation) for word in text.split()] - - if stop_words: - words = [word for word in words if word and word not in stop_words] - - from collections import Counter - - top_words = [word for word, freq in Counter(words).most_common(top_n)] - - return separator.join(top_words) - - """Creates a title, by combining first words with most frequent words from the text.""" - first_n_words = text.split()[:first_n_words] - top_n_words = _top_n_words(text, top_n=top_n_words) - return f"{' '.join(first_n_words)}... [{top_n_words}]" - - """Creates a dictionary of nodes with their names and content.""" - nodes = {} - for edge in retrieved_edges: - for node in (edge.node1, edge.node2): - if node.id not in nodes: - text = node.attributes.get("text") - if text: - name = _get_title(text) - content = text - else: - name = node.attributes.get("name", "Unnamed Node") - content = node.attributes.get("description", name) - nodes[node.id] = {"node": node, "name": name, "content": content} - return nodes - - nodes = _get_nodes(retrieved_edges) - node_section = "\n".join( - f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" - for info in nodes.values() - ) - connection_section = "\n".join( - f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" - for edge in retrieved_edges - ) - return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" diff --git a/cognee/modules/graph/utils/__init__.py b/cognee/modules/graph/utils/__init__.py index d1cda2d83..ebc648495 100644 --- a/cognee/modules/graph/utils/__init__.py +++ b/cognee/modules/graph/utils/__init__.py @@ -4,3 +4,4 @@ from .get_model_instance_from_graph import get_model_instance_from_graph from .retrieve_existing_edges import retrieve_existing_edges from .convert_node_to_data_point import convert_node_to_data_point from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges +from .resolve_edges_to_text import resolve_edges_to_text diff --git a/cognee/modules/graph/utils/resolve_edges_to_text.py b/cognee/modules/graph/utils/resolve_edges_to_text.py new file mode 100644 index 000000000..56c303abc --- /dev/null +++ b/cognee/modules/graph/utils/resolve_edges_to_text.py @@ -0,0 +1,67 @@ +async def resolve_edges_to_text(retrieved_edges: list) -> str: + """ + Converts retrieved graph edges into a human-readable string format. + + Parameters: + ----------- + + - retrieved_edges (list): A list of edges retrieved from the graph. + + Returns: + -------- + + - str: A formatted string representation of the nodes and their connections. + """ + + def _get_nodes(retrieved_edges: list) -> dict: + def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: + def _top_n_words(text, stop_words=None, top_n=3, separator=", "): + """Concatenates the top N frequent words in text.""" + if stop_words is None: + from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS + + stop_words = DEFAULT_STOP_WORDS + + import string + + words = [word.lower().strip(string.punctuation) for word in text.split()] + + if stop_words: + words = [word for word in words if word and word not in stop_words] + + from collections import Counter + + top_words = [word for word, freq in Counter(words).most_common(top_n)] + + return separator.join(top_words) + + """Creates a title, by combining first words with most frequent words from the text.""" + first_n_words = text.split()[:first_n_words] + top_n_words = _top_n_words(text, top_n=top_n_words) + return f"{' '.join(first_n_words)}... [{top_n_words}]" + + """Creates a dictionary of nodes with their names and content.""" + nodes = {} + for edge in retrieved_edges: + for node in (edge.node1, edge.node2): + if node.id not in nodes: + text = node.attributes.get("text") + if text: + name = _get_title(text) + content = text + else: + name = node.attributes.get("name", "Unnamed Node") + content = node.attributes.get("description", name) + nodes[node.id] = {"node": node, "name": name, "content": content} + return nodes + + nodes = _get_nodes(retrieved_edges) + node_section = "\n".join( + f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" + for info in nodes.values() + ) + connection_section = "\n".join( + f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" + for edge in retrieved_edges + ) + return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" diff --git a/cognee/modules/retrieval/graph_completion_retriever.py b/cognee/modules/retrieval/graph_completion_retriever.py index 6a5193c56..bc4fa27b3 100644 --- a/cognee/modules/retrieval/graph_completion_retriever.py +++ b/cognee/modules/retrieval/graph_completion_retriever.py @@ -5,6 +5,7 @@ import string from cognee.infrastructure.engine import DataPoint from cognee.tasks.storage import add_data_points +from cognee.modules.graph.utils import resolve_edges_to_text from cognee.modules.graph.utils.convert_node_to_data_point import get_all_subclasses from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.utils.brute_force_triplet_search import brute_force_triplet_search @@ -53,22 +54,6 @@ class GraphCompletionRetriever(BaseRetriever): self.node_type = node_type self.node_name = node_name - def _get_nodes(self, retrieved_edges: list) -> dict: - """Creates a dictionary of nodes with their names and content.""" - nodes = {} - for edge in retrieved_edges: - for node in (edge.node1, edge.node2): - if node.id not in nodes: - text = node.attributes.get("text") - if text: - name = self._get_title(text) - content = text - else: - name = node.attributes.get("name", "Unnamed Node") - content = node.attributes.get("description", name) - nodes[node.id] = {"node": node, "name": name, "content": content} - return nodes - async def resolve_edges_to_text(self, retrieved_edges: list) -> str: """ Converts retrieved graph edges into a human-readable string format. @@ -83,16 +68,7 @@ class GraphCompletionRetriever(BaseRetriever): - str: A formatted string representation of the nodes and their connections. """ - nodes = self._get_nodes(retrieved_edges) - node_section = "\n".join( - f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" - for info in nodes.values() - ) - connection_section = "\n".join( - f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" - for edge in retrieved_edges - ) - return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" + return await resolve_edges_to_text(retrieved_edges) async def get_triplets(self, query: str) -> list: """ @@ -196,26 +172,6 @@ class GraphCompletionRetriever(BaseRetriever): return [completion] - def _top_n_words(self, text, stop_words=None, top_n=3, separator=", "): - """Concatenates the top N frequent words in text.""" - if stop_words is None: - stop_words = DEFAULT_STOP_WORDS - - words = [word.lower().strip(string.punctuation) for word in text.split()] - - if stop_words: - words = [word for word in words if word and word not in stop_words] - - top_words = [word for word, freq in Counter(words).most_common(top_n)] - - return separator.join(top_words) - - def _get_title(self, text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: - """Creates a title, by combining first words with most frequent words from the text.""" - first_n_words = text.split()[:first_n_words] - top_n_words = self._top_n_words(text, top_n=top_n_words) - return f"{' '.join(first_n_words)}... [{top_n_words}]" - async def save_qa(self, question: str, answer: str, context: str, triplets: List) -> None: """ Saves a question and answer pair for later analysis or storage. diff --git a/cognee/tasks/codingagents/coding_rule_associations.py b/cognee/tasks/codingagents/coding_rule_associations.py index 6971ecc83..e722e7728 100644 --- a/cognee/tasks/codingagents/coding_rule_associations.py +++ b/cognee/tasks/codingagents/coding_rule_associations.py @@ -96,10 +96,14 @@ async def add_rule_associations( user_prompt_location: str = "coding_rule_association_agent_user.txt", system_prompt_location: str = "coding_rule_association_agent_system.txt", ): + if isinstance(data, list): + # If data is a list of strings join all strings in list + data = " ".join(data) + graph_engine = await get_graph_engine() existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name) - user_context = {"user data": data, "rules": existing_rules} + user_context = {"chat": data, "rules": existing_rules} user_prompt = LLMGateway.render_prompt(user_prompt_location, context=user_context) system_prompt = LLMGateway.render_prompt(system_prompt_location, context={}) diff --git a/cognee/tasks/memify/__init__.py b/cognee/tasks/memify/__init__.py index d2e0172f6..692bac443 100644 --- a/cognee/tasks/memify/__init__.py +++ b/cognee/tasks/memify/__init__.py @@ -1 +1,2 @@ from .extract_subgraph import extract_subgraph +from .extract_subgraph_chunks import extract_subgraph_chunks diff --git a/cognee/tasks/memify/extract_subgraph.py b/cognee/tasks/memify/extract_subgraph.py index 198a5b367..d6ca3773f 100644 --- a/cognee/tasks/memify/extract_subgraph.py +++ b/cognee/tasks/memify/extract_subgraph.py @@ -1,7 +1,7 @@ -from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph -async def extract_subgraph(subgraphs): +async def extract_subgraph(subgraphs: list[CogneeGraph]): for subgraph in subgraphs: - for edge in subgraph: + for edge in subgraph.edges: yield edge diff --git a/cognee/tasks/memify/extract_subgraph_chunks.py b/cognee/tasks/memify/extract_subgraph_chunks.py new file mode 100644 index 000000000..9aab498d7 --- /dev/null +++ b/cognee/tasks/memify/extract_subgraph_chunks.py @@ -0,0 +1,11 @@ +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + + +async def extract_subgraph_chunks(subgraphs: list[CogneeGraph]): + """ + Get all Document Chunks from subgraphs and forward to next task in pipeline + """ + for subgraph in subgraphs: + for node in subgraph.nodes.values(): + if node.attributes["type"] == "DocumentChunk": + yield node.attributes["text"] diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py index 70064c346..004a840f8 100644 --- a/examples/python/memify_coding_agent_example.py +++ b/examples/python/memify_coding_agent_example.py @@ -1,7 +1,18 @@ import asyncio +import pathlib +import os + import cognee +from cognee.api.v1.visualize.visualize import visualize_graph from cognee.shared.logging_utils import setup_logging, ERROR -from cognee.api.v1.search import SearchType +from cognee.api.v1.cognify.memify import memify +from cognee.modules.pipelines.tasks.task import Task +from cognee.tasks.memify.extract_subgraph import extract_subgraph +from cognee.modules.graph.utils import resolve_edges_to_text +from cognee.tasks.codingagents.coding_rule_associations import ( + add_rule_associations, + get_existing_rules, +) # Prerequisites: # 1. Copy `.env.template` and rename it to `.env`. @@ -38,14 +49,10 @@ async def main(): await cognee.cognify() print("Cognify process complete.\n") - from cognee.api.v1.cognify.memify import memify + subgraph_extraction_tasks = [Task(extract_subgraph)] - from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph - from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations - from cognee.modules.pipelines.tasks.task import Task - - memify_tasks = [ - Task(CogneeGraph.resolve_edges_to_text, task_config={"batch_size": 10}), + rule_association_tasks = [ + Task(resolve_edges_to_text, task_config={"batch_size": 10}), Task( add_rule_associations, rules_nodeset_name="coding_agent_rules", @@ -54,11 +61,14 @@ async def main(): ), ] - await memify(tasks=memify_tasks, node_name=["coding_rules"]) + await memify( + preprocessing_tasks=subgraph_extraction_tasks, + processing_tasks=rule_association_tasks, + node_name=["coding_rules"], + ) - import os - import pathlib - from cognee.api.v1.visualize.visualize import visualize_graph + developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") + print(developer_rules) file_path = os.path.join( pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html" diff --git a/examples/python/memify_coding_agent_example_chunks.py b/examples/python/memify_coding_agent_example_chunks.py new file mode 100644 index 000000000..b07bcb815 --- /dev/null +++ b/examples/python/memify_coding_agent_example_chunks.py @@ -0,0 +1,106 @@ +import asyncio +import pathlib +import os + +import cognee +from cognee.api.v1.visualize.visualize import visualize_graph +from cognee.shared.logging_utils import setup_logging, ERROR +from cognee.api.v1.cognify.memify import memify +from cognee.modules.pipelines.tasks.task import Task +from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks +from cognee.tasks.codingagents.coding_rule_associations import ( + add_rule_associations, + get_existing_rules, +) + +# Prerequisites: +# 1. Copy `.env.template` and rename it to `.env`. +# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field: +# LLM_API_KEY = "your_key_here" + + +async def main(): + # Create a clean slate for cognee -- reset data and system state + print("Resetting cognee data...") + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + print("Data reset complete.\n") + print("Adding conversation about rules to cognee:\n") + + coding_rules_chat_from_principal_engineer = """ + We want code to be formatted by PEP8 standards. + Typing and Docstrings must be added. + Please also make sure to write NOTE: on all more complex code segments. + If there is any duplicate code, try to handle it in one function to avoid code duplication. + Susan should also always review new code changes before merging to main. + New releases should not happen on Friday so we don't have to fix them during the weekend. + """ + print( + f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}" + ) + + coding_rules_chat_from_manager = """ + Susan should always review new code changes before merging to main. + New releases should not happen on Friday so we don't have to fix them during the weekend. + """ + print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}") + + # Add the text, and make it available for cognify + await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager]) + print("Text added successfully.\n") + + # Use LLMs and cognee to create knowledge graph + await cognee.cognify() + print("Cognify process complete.\n") + + # Visualize graph after cognification + file_path = os.path.join( + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html" + ) + await visualize_graph(file_path) + print(f"Open file to see graph visualization only after cognification: {file_path}") + + # After graph is created, create a second pipeline that will go through the graph and enchance it with specific + # coding rule nodes + + # extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify) + subgraph_extraction_tasks = [Task(extract_subgraph_chunks)] + + # add_rule_associations is a function that handles processing coding rules from chunks and keeps track of + # existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created + # in the graph that specify coding rules found in conversations. + coding_rules_association_tasks = [ + Task( + add_rule_associations, + rules_nodeset_name="coding_agent_rules", + task_config={"batch_size": 1}, + ), + ] + + # Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified). + # If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead + await memify( + preprocessing_tasks=subgraph_extraction_tasks, + processing_tasks=coding_rules_association_tasks, + ) + + # Find the new specific coding rules added to graph through memify (created based on chat conversation between team members) + developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") + print(developer_rules) + + # Visualize new graph with added memify context + file_path = os.path.join( + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html" + ) + await visualize_graph(file_path) + print(f"Open file to see graph visualization after memify enhancment: {file_path}") + + +if __name__ == "__main__": + logger = setup_logging(log_level=ERROR) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + finally: + loop.run_until_complete(loop.shutdown_asyncgens())