diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 98771947c..eeb867984 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -150,7 +150,9 @@ async def add( user, authorized_dataset = await resolve_authorized_user_dataset(dataset_id, dataset_name, user) - await reset_dataset_pipeline_run_status(authorized_dataset.id, user) + await reset_dataset_pipeline_run_status( + authorized_dataset.id, user, pipeline_names=["add_pipeline", "cognify_pipeline"] + ) pipeline_run_info = None diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py index 65a622af7..7e35ef5dc 100644 --- a/cognee/api/v1/cognify/memify.py +++ b/cognee/api/v1/cognify/memify.py @@ -1,28 +1,33 @@ -from pydantic import BaseModel from typing import Union, Optional, List, Type from uuid import UUID from cognee.shared.logging_utils import get_logger -from cognee.shared.data_models import KnowledgeGraph -from cognee.infrastructure.llm import get_max_chunk_tokens +from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph from cognee.modules.engine.models.node_set import NodeSet from cognee.modules.pipelines import run_pipeline from cognee.modules.pipelines.tasks.task import Task -from cognee.modules.chunking.TextChunker import TextChunker -from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver from cognee.modules.users.models import User +from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( + resolve_authorized_user_datasets, +) +from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( + reset_dataset_pipeline_run_status, +) +from cognee.modules.engine.operations.setup import setup -from cognee.tasks.memify import extract_subgraph +from cognee.tasks.memify.extract_subgraph import extract_subgraph +from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor logger = get_logger("memify") async def memify( + tasks: List[Task], datasets: Union[str, list[str], list[UUID]] = None, user: User = None, - tasks: List[Task] = None, node_type: Optional[Type] = NodeSet, node_name: Optional[List[str]] = None, cypher_query: Optional[str] = None, @@ -50,11 +55,35 @@ async def memify( Background mode recommended for large datasets (>100MB). Use pipeline_run_id from return value to monitor progress. """ + + if cypher_query: + pass + else: + memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) + # List of edges should be a single element in the list to represent one data item + data = [memory_fragment.edges] + memify_tasks = [ - Task(extract_subgraph, cypher_query=cypher_query, node_type=node_type, node_name=node_name), - *tasks, # Unpack tasks provided to memify pipeline + Task(extract_subgraph), + Task(CogneeGraph.resolve_edges_to_text, task_config={"batch_size": 10}), + Task( + add_rule_associations, + rules_nodeset_name="coding_agent_rules", + user_prompt_location="memify_coding_rule_association_agent_user.txt", + system_prompt_location="memify_coding_rule_association_agent_system.txt", + ), + # *tasks, # Unpack tasks provided to memify pipeline ] + await setup() + + user, authorized_datasets = await resolve_authorized_user_datasets(datasets, user) + + for dataset in authorized_datasets: + await reset_dataset_pipeline_run_status( + dataset.id, user, pipeline_names=["memify_pipeline"] + ) + # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) @@ -63,6 +92,7 @@ async def memify( pipeline=run_pipeline, tasks=memify_tasks, user=user, + data=data, datasets=datasets, vector_db_config=vector_db_config, graph_db_config=graph_db_config, diff --git a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt new file mode 100644 index 000000000..31c9825bd --- /dev/null +++ b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt @@ -0,0 +1,6 @@ +You are an association agent tasked with suggesting structured developer rules from user-agent interactions stored in a Knowledge Graph. +You will receive the actual user agent interaction as a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet, and the list of the already existing developer rules. +Each rule represents a single best practice or guideline the agent should follow in the future. +Suggest rules that are general and not specific to the current text, strictly technical, add value and improve the future agent behavior. +Do not suggest rules similar to the existing ones or rules that are not general and dont add value. +It is acceptable to return an empty rule list. diff --git a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt new file mode 100644 index 000000000..9b525c625 --- /dev/null +++ b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt @@ -0,0 +1,6 @@ +**Here is the User-agent interaction context provided with a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet:** +`{{ chat }}` + + +**Already existing rules:** +`{{ rules }}` diff --git a/cognee/modules/graph/cognee_graph/CogneeGraph.py b/cognee/modules/graph/cognee_graph/CogneeGraph.py index 924532ce0..94a8e965e 100644 --- a/cognee/modules/graph/cognee_graph/CogneeGraph.py +++ b/cognee/modules/graph/cognee_graph/CogneeGraph.py @@ -188,3 +188,72 @@ class CogneeGraph(CogneeAbstractGraph): return n1 + n2 + e return heapq.nsmallest(k, self.edges, key=score) + + @staticmethod + async def resolve_edges_to_text(retrieved_edges: list) -> str: + """ + Converts retrieved graph edges into a human-readable string format. + + Parameters: + ----------- + + - retrieved_edges (list): A list of edges retrieved from the graph. + + Returns: + -------- + + - str: A formatted string representation of the nodes and their connections. + """ + + def _get_nodes(retrieved_edges: list) -> dict: + def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: + def _top_n_words(text, stop_words=None, top_n=3, separator=", "): + """Concatenates the top N frequent words in text.""" + if stop_words is None: + from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS + + stop_words = DEFAULT_STOP_WORDS + + import string + + words = [word.lower().strip(string.punctuation) for word in text.split()] + + if stop_words: + words = [word for word in words if word and word not in stop_words] + + from collections import Counter + + top_words = [word for word, freq in Counter(words).most_common(top_n)] + + return separator.join(top_words) + + """Creates a title, by combining first words with most frequent words from the text.""" + first_n_words = text.split()[:first_n_words] + top_n_words = _top_n_words(text, top_n=top_n_words) + return f"{' '.join(first_n_words)}... [{top_n_words}]" + + """Creates a dictionary of nodes with their names and content.""" + nodes = {} + for edge in retrieved_edges: + for node in (edge.node1, edge.node2): + if node.id not in nodes: + text = node.attributes.get("text") + if text: + name = _get_title(text) + content = text + else: + name = node.attributes.get("name", "Unnamed Node") + content = node.attributes.get("description", name) + nodes[node.id] = {"node": node, "name": name, "content": content} + return nodes + + nodes = _get_nodes(retrieved_edges) + node_section = "\n".join( + f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" + for info in nodes.values() + ) + connection_section = "\n".join( + f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" + for edge in retrieved_edges + ) + return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" diff --git a/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py b/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py index cc72a6e51..bc59f9a6b 100644 --- a/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py +++ b/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py @@ -1,12 +1,28 @@ from uuid import UUID +from typing import Optional, List + from cognee.modules.pipelines.methods import get_pipeline_runs_by_dataset, reset_pipeline_run_status from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus from cognee.modules.users.models import User -async def reset_dataset_pipeline_run_status(dataset_id: UUID, user: User): +async def reset_dataset_pipeline_run_status( + dataset_id: UUID, user: User, pipeline_names: Optional[list[str]] = None +): + """Reset the status of all (or selected) pipeline runs for a dataset. + + If *pipeline_names* is given, only runs whose *pipeline_name* is in + that list are touched. + """ related_pipeline_runs = await get_pipeline_runs_by_dataset(dataset_id) for pipeline_run in related_pipeline_runs: - if pipeline_run.status is not PipelineRunStatus.DATASET_PROCESSING_INITIATED: - await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name) + # Skip runs that are initiated + if pipeline_run.status is PipelineRunStatus.DATASET_PROCESSING_INITIATED: + continue + + # If a name filter is provided, skip non-matching runs + if pipeline_names is not None and pipeline_run.pipeline_name not in pipeline_names: + continue + + await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name) diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index cbe6dee5c..b59a171f7 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -5,6 +5,7 @@ from typing import Union from cognee.modules.pipelines.layers.setup_and_check_environment import ( setup_and_check_environment, ) + from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.models import Data, Dataset diff --git a/cognee/tasks/codingagents/__init__.py b/cognee/tasks/codingagents/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/tasks/codingagents/coding_rule_associations.py b/cognee/tasks/codingagents/coding_rule_associations.py new file mode 100644 index 000000000..6971ecc83 --- /dev/null +++ b/cognee/tasks/codingagents/coding_rule_associations.py @@ -0,0 +1,124 @@ +from uuid import NAMESPACE_OID, uuid5 + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.vector import get_vector_engine + +from cognee.low_level import DataPoint +from cognee.infrastructure.llm import LLMGateway +from cognee.shared.logging_utils import get_logger +from cognee.modules.engine.models import NodeSet +from cognee.tasks.storage import add_data_points, index_graph_edges +from typing import Optional, List, Any +from pydantic import Field + +logger = get_logger("coding_rule_association") + + +class Rule(DataPoint): + """A single developer rule extracted from text.""" + + text: str = Field(..., description="The coding rule associated with the conversation") + belongs_to_set: Optional[NodeSet] = None + metadata: dict = {"index_fields": ["rule"]} + + +class RuleSet(DataPoint): + """Collection of parsed rules.""" + + rules: List[Rule] = Field( + ..., + description="List of developer rules extracted from the input text. Each rule represents a coding best practice or guideline.", + ) + + +async def get_existing_rules(rules_nodeset_name: str) -> str: + graph_engine = await get_graph_engine() + nodes_data, _ = await graph_engine.get_nodeset_subgraph( + node_type=NodeSet, node_name=[rules_nodeset_name] + ) + + existing_rules = [ + item[1]["text"] + for item in nodes_data + if isinstance(item, tuple) + and len(item) == 2 + and isinstance(item[1], dict) + and "text" in item[1] + ] + + existing_rules = "\n".join(f"- {rule}" for rule in existing_rules) + + return existing_rules + + +async def get_origin_edges(data: str, rules: List[Rule]) -> list[Any]: + vector_engine = get_vector_engine() + + origin_chunk = await vector_engine.search("DocumentChunk_text", data, limit=1) + + try: + origin_id = origin_chunk[0].id + except (AttributeError, KeyError, TypeError, IndexError): + origin_id = None + + relationships = [] + + if origin_id and isinstance(rules, (list, tuple)) and len(rules) > 0: + for rule in rules: + try: + rule_id = getattr(rule, "id", None) + if rule_id is not None: + rel_name = "rule_associated_from" + relationships.append( + ( + rule_id, + origin_id, + rel_name, + { + "relationship_name": rel_name, + "source_node_id": rule_id, + "target_node_id": origin_id, + "ontology_valid": False, + }, + ) + ) + except Exception as e: + logger.info(f"Warning: Skipping invalid rule due to error: {e}") + else: + logger.info("No valid origin_id or rules provided.") + + return relationships + + +async def add_rule_associations( + data: str, + rules_nodeset_name: str, + user_prompt_location: str = "coding_rule_association_agent_user.txt", + system_prompt_location: str = "coding_rule_association_agent_system.txt", +): + graph_engine = await get_graph_engine() + existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name) + + user_context = {"user data": data, "rules": existing_rules} + + user_prompt = LLMGateway.render_prompt(user_prompt_location, context=user_context) + system_prompt = LLMGateway.render_prompt(system_prompt_location, context={}) + + rule_list = await LLMGateway.acreate_structured_output( + text_input=user_prompt, system_prompt=system_prompt, response_model=RuleSet + ) + + rules_nodeset = NodeSet( + id=uuid5(NAMESPACE_OID, name=rules_nodeset_name), name=rules_nodeset_name + ) + for rule in rule_list.rules: + rule.belongs_to_set = rules_nodeset + + edges_to_save = await get_origin_edges(data=data, rules=rule_list.rules) + + await add_data_points(data_points=rule_list.rules) + + if len(edges_to_save) > 0: + await graph_engine.add_edges(edges_to_save) + + await index_graph_edges() diff --git a/cognee/tasks/memify/__init__.py b/cognee/tasks/memify/__init__.py index a95e88794..d2e0172f6 100644 --- a/cognee/tasks/memify/__init__.py +++ b/cognee/tasks/memify/__init__.py @@ -1 +1 @@ -from extract_subgraph import extract_subgraph +from .extract_subgraph import extract_subgraph diff --git a/cognee/tasks/memify/extract_subgraph.py b/cognee/tasks/memify/extract_subgraph.py index 1cf7ab951..198a5b367 100644 --- a/cognee/tasks/memify/extract_subgraph.py +++ b/cognee/tasks/memify/extract_subgraph.py @@ -1,2 +1,7 @@ -async def extract_subgraph(): - pass +from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment + + +async def extract_subgraph(subgraphs): + for subgraph in subgraphs: + for edge in subgraph: + yield edge