refactor: Change input task names

This commit is contained in:
Igor Ilic 2025-09-03 17:49:33 +02:00
parent 90ef8c30d2
commit 0e3a10d925
5 changed files with 51 additions and 148 deletions

View file

@ -1,5 +1,4 @@
from typing import Union, Optional, List, Type, Any
from dataclasses import field
from uuid import UUID
from cognee.shared.logging_utils import get_logger
@ -18,14 +17,17 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
)
from cognee.modules.engine.operations.setup import setup
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks
from cognee.tasks.codingagents.coding_rule_associations import (
add_rule_associations,
)
logger = get_logger("memify")
async def memify(
data_streaming_tasks: List[Task],
data_processing_tasks: List[Task] = [],
data_persistence_tasks: List[Task] = [],
extraction_tasks: List[Task] = [Task(extract_subgraph_chunks)],
enrichment_tasks: List[Task] = [Task(add_rule_associations)],
data: Optional[Any] = None,
datasets: Union[str, list[str], list[UUID]] = None,
user: User = None,
@ -66,9 +68,8 @@ async def memify(
data = [memory_fragment]
memify_tasks = [
*data_streaming_tasks, # Unpack tasks provided to memify pipeline
*data_processing_tasks,
*data_persistence_tasks,
*extraction_tasks, # Unpack tasks provided to memify pipeline
*enrichment_tasks,
]
await setup()

View file

@ -1,6 +0,0 @@
You are an association agent tasked with suggesting structured developer rules from user-agent interactions stored in a Knowledge Graph.
You will receive the actual user agent interaction as a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet, and the list of the already existing developer rules.
Each rule represents a single best practice or guideline the agent should follow in the future.
Suggest rules that are general and not specific to the knowledge graph relationships, strictly technical, add value and improve the future agent behavior.
Do not suggest rules similar to the existing ones or rules that are not general and dont add value.
It is acceptable to return an empty rule list.

View file

@ -1,6 +0,0 @@
**Here is the User-agent interaction context provided with a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet:**
`{{ chat }}`
**Already existing rules:**
`{{ rules }}`

View file

@ -7,8 +7,7 @@ from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.shared.logging_utils import setup_logging, ERROR
from cognee.api.v1.cognify.memify import memify
from cognee.modules.pipelines.tasks.task import Task
from cognee.tasks.memify.extract_subgraph import extract_subgraph
from cognee.modules.graph.utils import resolve_edges_to_text
from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks
from cognee.tasks.codingagents.coding_rule_associations import (
add_rule_associations,
get_existing_rules,
@ -26,54 +25,75 @@ async def main():
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
print("Data reset complete.\n")
print("Adding conversation about rules to cognee:\n")
# cognee knowledge graph will be created based on this text
text = """
Natural language processing (NLP) is an interdisciplinary
subfield of computer science and information retrieval.
"""
coding_rules_text = """
Code must be formatted by PEP8 standards.
coding_rules_chat_from_principal_engineer = """
We want code to be formatted by PEP8 standards.
Typing and Docstrings must be added.
Please also make sure to write NOTE: on all more complex code segments.
If there is any duplicate code, try to handle it in one function to avoid code duplication.
Susan should also always review new code changes before merging to main.
New releases should not happen on Friday so we don't have to fix them during the weekend.
"""
print(
f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}"
)
coding_rules_chat_from_manager = """
Susan should always review new code changes before merging to main.
New releases should not happen on Friday so we don't have to fix them during the weekend.
"""
print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}")
print("Adding text to cognee:")
print(text.strip())
# Add the text, and make it available for cognify
await cognee.add(text)
await cognee.add(coding_rules_text, node_set=["coding_rules"])
await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager])
print("Text added successfully.\n")
# Use LLMs and cognee to create knowledge graph
await cognee.cognify()
print("Cognify process complete.\n")
subgraph_extraction_tasks = [Task(extract_subgraph)]
# Visualize graph after cognification
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization only after cognification: {file_path}")
rule_association_tasks = [
Task(resolve_edges_to_text, task_config={"batch_size": 10}),
# After graph is created, create a second pipeline that will go through the graph and enchance it with specific
# coding rule nodes
# extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify)
subgraph_extraction_tasks = [Task(extract_subgraph_chunks)]
# add_rule_associations is a function that handles processing coding rules from chunks and keeps track of
# existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created
# in the graph that specify coding rules found in conversations.
coding_rules_association_tasks = [
Task(
add_rule_associations,
rules_nodeset_name="coding_agent_rules",
user_prompt_location="memify_coding_rule_association_agent_user.txt",
system_prompt_location="memify_coding_rule_association_agent_system.txt",
task_config={"batch_size": 1},
),
]
# Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified).
# If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead
await memify(
data_streaming_tasks=subgraph_extraction_tasks,
data_processing_tasks=rule_association_tasks,
node_name=["coding_rules"],
extraction_tasks=subgraph_extraction_tasks,
enrichment_tasks=coding_rules_association_tasks,
)
# Find the new specific coding rules added to graph through memify (created based on chat conversation between team members)
developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules")
print(developer_rules)
# Visualize new graph with added memify context
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html"
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization after memify enhancment: {file_path}")
if __name__ == "__main__":

View file

@ -1,106 +0,0 @@
import asyncio
import pathlib
import os
import cognee
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.shared.logging_utils import setup_logging, ERROR
from cognee.api.v1.cognify.memify import memify
from cognee.modules.pipelines.tasks.task import Task
from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks
from cognee.tasks.codingagents.coding_rule_associations import (
add_rule_associations,
get_existing_rules,
)
# Prerequisites:
# 1. Copy `.env.template` and rename it to `.env`.
# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field:
# LLM_API_KEY = "your_key_here"
async def main():
# Create a clean slate for cognee -- reset data and system state
print("Resetting cognee data...")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
print("Data reset complete.\n")
print("Adding conversation about rules to cognee:\n")
coding_rules_chat_from_principal_engineer = """
We want code to be formatted by PEP8 standards.
Typing and Docstrings must be added.
Please also make sure to write NOTE: on all more complex code segments.
If there is any duplicate code, try to handle it in one function to avoid code duplication.
Susan should also always review new code changes before merging to main.
New releases should not happen on Friday so we don't have to fix them during the weekend.
"""
print(
f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}"
)
coding_rules_chat_from_manager = """
Susan should always review new code changes before merging to main.
New releases should not happen on Friday so we don't have to fix them during the weekend.
"""
print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}")
# Add the text, and make it available for cognify
await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager])
print("Text added successfully.\n")
# Use LLMs and cognee to create knowledge graph
await cognee.cognify()
print("Cognify process complete.\n")
# Visualize graph after cognification
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization only after cognification: {file_path}")
# After graph is created, create a second pipeline that will go through the graph and enchance it with specific
# coding rule nodes
# extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify)
subgraph_extraction_tasks = [Task(extract_subgraph_chunks)]
# add_rule_associations is a function that handles processing coding rules from chunks and keeps track of
# existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created
# in the graph that specify coding rules found in conversations.
coding_rules_association_tasks = [
Task(
add_rule_associations,
rules_nodeset_name="coding_agent_rules",
task_config={"batch_size": 1},
),
]
# Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified).
# If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead
await memify(
data_streaming_tasks=subgraph_extraction_tasks,
data_processing_tasks=coding_rules_association_tasks,
)
# Find the new specific coding rules added to graph through memify (created based on chat conversation between team members)
developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules")
print(developer_rules)
# Visualize new graph with added memify context
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization after memify enhancment: {file_path}")
if __name__ == "__main__":
logger = setup_logging(log_level=ERROR)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())