From 72e5b2bec877c8c8d4775a1ff780673604c6ac92 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Mon, 1 Sep 2025 17:48:50 +0200 Subject: [PATCH 01/14] feat: Initial memify commit --- cognee/api/v1/cognify/memify.py | 71 +++++++++++++++++++++++++ cognee/tasks/memify/__init__.py | 1 + cognee/tasks/memify/extract_subgraph.py | 2 + 3 files changed, 74 insertions(+) create mode 100644 cognee/api/v1/cognify/memify.py create mode 100644 cognee/tasks/memify/__init__.py create mode 100644 cognee/tasks/memify/extract_subgraph.py diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py new file mode 100644 index 000000000..65a622af7 --- /dev/null +++ b/cognee/api/v1/cognify/memify.py @@ -0,0 +1,71 @@ +from pydantic import BaseModel +from typing import Union, Optional, List, Type +from uuid import UUID + +from cognee.shared.logging_utils import get_logger +from cognee.shared.data_models import KnowledgeGraph +from cognee.infrastructure.llm import get_max_chunk_tokens + +from cognee.modules.engine.models.node_set import NodeSet +from cognee.modules.pipelines import run_pipeline +from cognee.modules.pipelines.tasks.task import Task +from cognee.modules.chunking.TextChunker import TextChunker +from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver +from cognee.modules.users.models import User + +from cognee.tasks.memify import extract_subgraph +from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor + +logger = get_logger("memify") + + +async def memify( + datasets: Union[str, list[str], list[UUID]] = None, + user: User = None, + tasks: List[Task] = None, + node_type: Optional[Type] = NodeSet, + node_name: Optional[List[str]] = None, + cypher_query: Optional[str] = None, + vector_db_config: dict = None, + graph_db_config: dict = None, + run_in_background: bool = False, +): + """ + Prerequisites: + - **LLM_API_KEY**: Must be configured (required for entity extraction and graph generation) + - **Data Added**: Must have data previously added via `cognee.add()` and `cognee.cognify()` + - **Vector Database**: Must be accessible for embeddings storage + - **Graph Database**: Must be accessible for relationship storage + + Args: + datasets: Dataset name(s) or dataset uuid to process. Processes all available data if None. + - Single dataset: "my_dataset" + - Multiple datasets: ["docs", "research", "reports"] + - None: Process all datasets for the user + user: User context for authentication and data access. Uses default if None. + vector_db_config: Custom vector database configuration for embeddings storage. + graph_db_config: Custom graph database configuration for relationship storage. + run_in_background: If True, starts processing asynchronously and returns immediately. + If False, waits for completion before returning. + Background mode recommended for large datasets (>100MB). + Use pipeline_run_id from return value to monitor progress. + """ + memify_tasks = [ + Task(extract_subgraph, cypher_query=cypher_query, node_type=node_type, node_name=node_name), + *tasks, # Unpack tasks provided to memify pipeline + ] + + # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for + pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) + + # Run the run_pipeline in the background or blocking based on executor + return await pipeline_executor_func( + pipeline=run_pipeline, + tasks=memify_tasks, + user=user, + datasets=datasets, + vector_db_config=vector_db_config, + graph_db_config=graph_db_config, + incremental_loading=False, + pipeline_name="memify_pipeline", + ) diff --git a/cognee/tasks/memify/__init__.py b/cognee/tasks/memify/__init__.py new file mode 100644 index 000000000..a95e88794 --- /dev/null +++ b/cognee/tasks/memify/__init__.py @@ -0,0 +1 @@ +from extract_subgraph import extract_subgraph diff --git a/cognee/tasks/memify/extract_subgraph.py b/cognee/tasks/memify/extract_subgraph.py new file mode 100644 index 000000000..1cf7ab951 --- /dev/null +++ b/cognee/tasks/memify/extract_subgraph.py @@ -0,0 +1,2 @@ +async def extract_subgraph(): + pass From af084af70fe8fc940aacea27f16cd400611932e0 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Tue, 2 Sep 2025 21:32:09 +0200 Subject: [PATCH 02/14] feat: Memify pipeline initial commit --- cognee/api/v1/add/add.py | 4 +- cognee/api/v1/cognify/memify.py | 48 +++++-- ...y_coding_rule_association_agent_system.txt | 6 + ...ify_coding_rule_association_agent_user.txt | 6 + .../modules/graph/cognee_graph/CogneeGraph.py | 69 ++++++++++ .../reset_dataset_pipeline_run_status.py | 22 +++- .../modules/pipelines/operations/pipeline.py | 1 + cognee/tasks/codingagents/__init__.py | 0 .../codingagents/coding_rule_associations.py | 124 ++++++++++++++++++ cognee/tasks/memify/__init__.py | 2 +- cognee/tasks/memify/extract_subgraph.py | 9 +- 11 files changed, 275 insertions(+), 16 deletions(-) create mode 100644 cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt create mode 100644 cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt create mode 100644 cognee/tasks/codingagents/__init__.py create mode 100644 cognee/tasks/codingagents/coding_rule_associations.py diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 98771947c..eeb867984 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -150,7 +150,9 @@ async def add( user, authorized_dataset = await resolve_authorized_user_dataset(dataset_id, dataset_name, user) - await reset_dataset_pipeline_run_status(authorized_dataset.id, user) + await reset_dataset_pipeline_run_status( + authorized_dataset.id, user, pipeline_names=["add_pipeline", "cognify_pipeline"] + ) pipeline_run_info = None diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py index 65a622af7..7e35ef5dc 100644 --- a/cognee/api/v1/cognify/memify.py +++ b/cognee/api/v1/cognify/memify.py @@ -1,28 +1,33 @@ -from pydantic import BaseModel from typing import Union, Optional, List, Type from uuid import UUID from cognee.shared.logging_utils import get_logger -from cognee.shared.data_models import KnowledgeGraph -from cognee.infrastructure.llm import get_max_chunk_tokens +from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph from cognee.modules.engine.models.node_set import NodeSet from cognee.modules.pipelines import run_pipeline from cognee.modules.pipelines.tasks.task import Task -from cognee.modules.chunking.TextChunker import TextChunker -from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver from cognee.modules.users.models import User +from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( + resolve_authorized_user_datasets, +) +from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( + reset_dataset_pipeline_run_status, +) +from cognee.modules.engine.operations.setup import setup -from cognee.tasks.memify import extract_subgraph +from cognee.tasks.memify.extract_subgraph import extract_subgraph +from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor logger = get_logger("memify") async def memify( + tasks: List[Task], datasets: Union[str, list[str], list[UUID]] = None, user: User = None, - tasks: List[Task] = None, node_type: Optional[Type] = NodeSet, node_name: Optional[List[str]] = None, cypher_query: Optional[str] = None, @@ -50,11 +55,35 @@ async def memify( Background mode recommended for large datasets (>100MB). Use pipeline_run_id from return value to monitor progress. """ + + if cypher_query: + pass + else: + memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) + # List of edges should be a single element in the list to represent one data item + data = [memory_fragment.edges] + memify_tasks = [ - Task(extract_subgraph, cypher_query=cypher_query, node_type=node_type, node_name=node_name), - *tasks, # Unpack tasks provided to memify pipeline + Task(extract_subgraph), + Task(CogneeGraph.resolve_edges_to_text, task_config={"batch_size": 10}), + Task( + add_rule_associations, + rules_nodeset_name="coding_agent_rules", + user_prompt_location="memify_coding_rule_association_agent_user.txt", + system_prompt_location="memify_coding_rule_association_agent_system.txt", + ), + # *tasks, # Unpack tasks provided to memify pipeline ] + await setup() + + user, authorized_datasets = await resolve_authorized_user_datasets(datasets, user) + + for dataset in authorized_datasets: + await reset_dataset_pipeline_run_status( + dataset.id, user, pipeline_names=["memify_pipeline"] + ) + # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) @@ -63,6 +92,7 @@ async def memify( pipeline=run_pipeline, tasks=memify_tasks, user=user, + data=data, datasets=datasets, vector_db_config=vector_db_config, graph_db_config=graph_db_config, diff --git a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt new file mode 100644 index 000000000..31c9825bd --- /dev/null +++ b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt @@ -0,0 +1,6 @@ +You are an association agent tasked with suggesting structured developer rules from user-agent interactions stored in a Knowledge Graph. +You will receive the actual user agent interaction as a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet, and the list of the already existing developer rules. +Each rule represents a single best practice or guideline the agent should follow in the future. +Suggest rules that are general and not specific to the current text, strictly technical, add value and improve the future agent behavior. +Do not suggest rules similar to the existing ones or rules that are not general and dont add value. +It is acceptable to return an empty rule list. diff --git a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt new file mode 100644 index 000000000..9b525c625 --- /dev/null +++ b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt @@ -0,0 +1,6 @@ +**Here is the User-agent interaction context provided with a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet:** +`{{ chat }}` + + +**Already existing rules:** +`{{ rules }}` diff --git a/cognee/modules/graph/cognee_graph/CogneeGraph.py b/cognee/modules/graph/cognee_graph/CogneeGraph.py index 924532ce0..94a8e965e 100644 --- a/cognee/modules/graph/cognee_graph/CogneeGraph.py +++ b/cognee/modules/graph/cognee_graph/CogneeGraph.py @@ -188,3 +188,72 @@ class CogneeGraph(CogneeAbstractGraph): return n1 + n2 + e return heapq.nsmallest(k, self.edges, key=score) + + @staticmethod + async def resolve_edges_to_text(retrieved_edges: list) -> str: + """ + Converts retrieved graph edges into a human-readable string format. + + Parameters: + ----------- + + - retrieved_edges (list): A list of edges retrieved from the graph. + + Returns: + -------- + + - str: A formatted string representation of the nodes and their connections. + """ + + def _get_nodes(retrieved_edges: list) -> dict: + def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: + def _top_n_words(text, stop_words=None, top_n=3, separator=", "): + """Concatenates the top N frequent words in text.""" + if stop_words is None: + from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS + + stop_words = DEFAULT_STOP_WORDS + + import string + + words = [word.lower().strip(string.punctuation) for word in text.split()] + + if stop_words: + words = [word for word in words if word and word not in stop_words] + + from collections import Counter + + top_words = [word for word, freq in Counter(words).most_common(top_n)] + + return separator.join(top_words) + + """Creates a title, by combining first words with most frequent words from the text.""" + first_n_words = text.split()[:first_n_words] + top_n_words = _top_n_words(text, top_n=top_n_words) + return f"{' '.join(first_n_words)}... [{top_n_words}]" + + """Creates a dictionary of nodes with their names and content.""" + nodes = {} + for edge in retrieved_edges: + for node in (edge.node1, edge.node2): + if node.id not in nodes: + text = node.attributes.get("text") + if text: + name = _get_title(text) + content = text + else: + name = node.attributes.get("name", "Unnamed Node") + content = node.attributes.get("description", name) + nodes[node.id] = {"node": node, "name": name, "content": content} + return nodes + + nodes = _get_nodes(retrieved_edges) + node_section = "\n".join( + f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" + for info in nodes.values() + ) + connection_section = "\n".join( + f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" + for edge in retrieved_edges + ) + return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" diff --git a/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py b/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py index cc72a6e51..bc59f9a6b 100644 --- a/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py +++ b/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py @@ -1,12 +1,28 @@ from uuid import UUID +from typing import Optional, List + from cognee.modules.pipelines.methods import get_pipeline_runs_by_dataset, reset_pipeline_run_status from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus from cognee.modules.users.models import User -async def reset_dataset_pipeline_run_status(dataset_id: UUID, user: User): +async def reset_dataset_pipeline_run_status( + dataset_id: UUID, user: User, pipeline_names: Optional[list[str]] = None +): + """Reset the status of all (or selected) pipeline runs for a dataset. + + If *pipeline_names* is given, only runs whose *pipeline_name* is in + that list are touched. + """ related_pipeline_runs = await get_pipeline_runs_by_dataset(dataset_id) for pipeline_run in related_pipeline_runs: - if pipeline_run.status is not PipelineRunStatus.DATASET_PROCESSING_INITIATED: - await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name) + # Skip runs that are initiated + if pipeline_run.status is PipelineRunStatus.DATASET_PROCESSING_INITIATED: + continue + + # If a name filter is provided, skip non-matching runs + if pipeline_names is not None and pipeline_run.pipeline_name not in pipeline_names: + continue + + await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name) diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index cbe6dee5c..b59a171f7 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -5,6 +5,7 @@ from typing import Union from cognee.modules.pipelines.layers.setup_and_check_environment import ( setup_and_check_environment, ) + from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.models import Data, Dataset diff --git a/cognee/tasks/codingagents/__init__.py b/cognee/tasks/codingagents/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/tasks/codingagents/coding_rule_associations.py b/cognee/tasks/codingagents/coding_rule_associations.py new file mode 100644 index 000000000..6971ecc83 --- /dev/null +++ b/cognee/tasks/codingagents/coding_rule_associations.py @@ -0,0 +1,124 @@ +from uuid import NAMESPACE_OID, uuid5 + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.vector import get_vector_engine + +from cognee.low_level import DataPoint +from cognee.infrastructure.llm import LLMGateway +from cognee.shared.logging_utils import get_logger +from cognee.modules.engine.models import NodeSet +from cognee.tasks.storage import add_data_points, index_graph_edges +from typing import Optional, List, Any +from pydantic import Field + +logger = get_logger("coding_rule_association") + + +class Rule(DataPoint): + """A single developer rule extracted from text.""" + + text: str = Field(..., description="The coding rule associated with the conversation") + belongs_to_set: Optional[NodeSet] = None + metadata: dict = {"index_fields": ["rule"]} + + +class RuleSet(DataPoint): + """Collection of parsed rules.""" + + rules: List[Rule] = Field( + ..., + description="List of developer rules extracted from the input text. Each rule represents a coding best practice or guideline.", + ) + + +async def get_existing_rules(rules_nodeset_name: str) -> str: + graph_engine = await get_graph_engine() + nodes_data, _ = await graph_engine.get_nodeset_subgraph( + node_type=NodeSet, node_name=[rules_nodeset_name] + ) + + existing_rules = [ + item[1]["text"] + for item in nodes_data + if isinstance(item, tuple) + and len(item) == 2 + and isinstance(item[1], dict) + and "text" in item[1] + ] + + existing_rules = "\n".join(f"- {rule}" for rule in existing_rules) + + return existing_rules + + +async def get_origin_edges(data: str, rules: List[Rule]) -> list[Any]: + vector_engine = get_vector_engine() + + origin_chunk = await vector_engine.search("DocumentChunk_text", data, limit=1) + + try: + origin_id = origin_chunk[0].id + except (AttributeError, KeyError, TypeError, IndexError): + origin_id = None + + relationships = [] + + if origin_id and isinstance(rules, (list, tuple)) and len(rules) > 0: + for rule in rules: + try: + rule_id = getattr(rule, "id", None) + if rule_id is not None: + rel_name = "rule_associated_from" + relationships.append( + ( + rule_id, + origin_id, + rel_name, + { + "relationship_name": rel_name, + "source_node_id": rule_id, + "target_node_id": origin_id, + "ontology_valid": False, + }, + ) + ) + except Exception as e: + logger.info(f"Warning: Skipping invalid rule due to error: {e}") + else: + logger.info("No valid origin_id or rules provided.") + + return relationships + + +async def add_rule_associations( + data: str, + rules_nodeset_name: str, + user_prompt_location: str = "coding_rule_association_agent_user.txt", + system_prompt_location: str = "coding_rule_association_agent_system.txt", +): + graph_engine = await get_graph_engine() + existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name) + + user_context = {"user data": data, "rules": existing_rules} + + user_prompt = LLMGateway.render_prompt(user_prompt_location, context=user_context) + system_prompt = LLMGateway.render_prompt(system_prompt_location, context={}) + + rule_list = await LLMGateway.acreate_structured_output( + text_input=user_prompt, system_prompt=system_prompt, response_model=RuleSet + ) + + rules_nodeset = NodeSet( + id=uuid5(NAMESPACE_OID, name=rules_nodeset_name), name=rules_nodeset_name + ) + for rule in rule_list.rules: + rule.belongs_to_set = rules_nodeset + + edges_to_save = await get_origin_edges(data=data, rules=rule_list.rules) + + await add_data_points(data_points=rule_list.rules) + + if len(edges_to_save) > 0: + await graph_engine.add_edges(edges_to_save) + + await index_graph_edges() diff --git a/cognee/tasks/memify/__init__.py b/cognee/tasks/memify/__init__.py index a95e88794..d2e0172f6 100644 --- a/cognee/tasks/memify/__init__.py +++ b/cognee/tasks/memify/__init__.py @@ -1 +1 @@ -from extract_subgraph import extract_subgraph +from .extract_subgraph import extract_subgraph diff --git a/cognee/tasks/memify/extract_subgraph.py b/cognee/tasks/memify/extract_subgraph.py index 1cf7ab951..198a5b367 100644 --- a/cognee/tasks/memify/extract_subgraph.py +++ b/cognee/tasks/memify/extract_subgraph.py @@ -1,2 +1,7 @@ -async def extract_subgraph(): - pass +from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment + + +async def extract_subgraph(subgraphs): + for subgraph in subgraphs: + for edge in subgraph: + yield edge From 1a2977779f49001c5696330b005a3c90d75f6b7f Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 3 Sep 2025 12:03:17 +0200 Subject: [PATCH 03/14] feat: Add memify coding agent example --- cognee/api/v1/cognify/memify.py | 12 +-- .../python/memify_coding_agent_example.py | 76 +++++++++++++++++++ 2 files changed, 78 insertions(+), 10 deletions(-) create mode 100644 examples/python/memify_coding_agent_example.py diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py index 7e35ef5dc..df45bac76 100644 --- a/cognee/api/v1/cognify/memify.py +++ b/cognee/api/v1/cognify/memify.py @@ -4,7 +4,7 @@ from uuid import UUID from cognee.shared.logging_utils import get_logger from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment -from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + from cognee.modules.engine.models.node_set import NodeSet from cognee.modules.pipelines import run_pipeline from cognee.modules.pipelines.tasks.task import Task @@ -18,7 +18,6 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( from cognee.modules.engine.operations.setup import setup from cognee.tasks.memify.extract_subgraph import extract_subgraph -from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor logger = get_logger("memify") @@ -65,14 +64,7 @@ async def memify( memify_tasks = [ Task(extract_subgraph), - Task(CogneeGraph.resolve_edges_to_text, task_config={"batch_size": 10}), - Task( - add_rule_associations, - rules_nodeset_name="coding_agent_rules", - user_prompt_location="memify_coding_rule_association_agent_user.txt", - system_prompt_location="memify_coding_rule_association_agent_system.txt", - ), - # *tasks, # Unpack tasks provided to memify pipeline + *tasks, # Unpack tasks provided to memify pipeline ] await setup() diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py new file mode 100644 index 000000000..70064c346 --- /dev/null +++ b/examples/python/memify_coding_agent_example.py @@ -0,0 +1,76 @@ +import asyncio +import cognee +from cognee.shared.logging_utils import setup_logging, ERROR +from cognee.api.v1.search import SearchType + +# Prerequisites: +# 1. Copy `.env.template` and rename it to `.env`. +# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field: +# LLM_API_KEY = "your_key_here" + + +async def main(): + # Create a clean slate for cognee -- reset data and system state + print("Resetting cognee data...") + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + print("Data reset complete.\n") + + # cognee knowledge graph will be created based on this text + text = """ + Natural language processing (NLP) is an interdisciplinary + subfield of computer science and information retrieval. + """ + + coding_rules_text = """ + Code must be formatted by PEP8 standards. + Typing and Docstrings must be added. + """ + + print("Adding text to cognee:") + print(text.strip()) + # Add the text, and make it available for cognify + await cognee.add(text) + await cognee.add(coding_rules_text, node_set=["coding_rules"]) + print("Text added successfully.\n") + + # Use LLMs and cognee to create knowledge graph + await cognee.cognify() + print("Cognify process complete.\n") + + from cognee.api.v1.cognify.memify import memify + + from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations + from cognee.modules.pipelines.tasks.task import Task + + memify_tasks = [ + Task(CogneeGraph.resolve_edges_to_text, task_config={"batch_size": 10}), + Task( + add_rule_associations, + rules_nodeset_name="coding_agent_rules", + user_prompt_location="memify_coding_rule_association_agent_user.txt", + system_prompt_location="memify_coding_rule_association_agent_system.txt", + ), + ] + + await memify(tasks=memify_tasks, node_name=["coding_rules"]) + + import os + import pathlib + from cognee.api.v1.visualize.visualize import visualize_graph + + file_path = os.path.join( + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html" + ) + await visualize_graph(file_path) + + +if __name__ == "__main__": + logger = setup_logging(log_level=ERROR) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) From 2847569616cb47fa6f76c511d2d654a399dc24f1 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 3 Sep 2025 16:08:32 +0200 Subject: [PATCH 04/14] feat: memify next iteration --- cognee/api/v1/cognify/memify.py | 28 +++-- ...y_coding_rule_association_agent_system.txt | 2 +- .../modules/graph/cognee_graph/CogneeGraph.py | 69 ------------ cognee/modules/graph/utils/__init__.py | 1 + .../graph/utils/resolve_edges_to_text.py | 67 +++++++++++ .../retrieval/graph_completion_retriever.py | 48 +------- .../codingagents/coding_rule_associations.py | 6 +- cognee/tasks/memify/__init__.py | 1 + cognee/tasks/memify/extract_subgraph.py | 6 +- .../tasks/memify/extract_subgraph_chunks.py | 11 ++ .../python/memify_coding_agent_example.py | 34 ++++-- .../memify_coding_agent_example_chunks.py | 106 ++++++++++++++++++ 12 files changed, 235 insertions(+), 144 deletions(-) create mode 100644 cognee/modules/graph/utils/resolve_edges_to_text.py create mode 100644 cognee/tasks/memify/extract_subgraph_chunks.py create mode 100644 examples/python/memify_coding_agent_example_chunks.py diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py index df45bac76..8237059ec 100644 --- a/cognee/api/v1/cognify/memify.py +++ b/cognee/api/v1/cognify/memify.py @@ -1,4 +1,5 @@ -from typing import Union, Optional, List, Type +from typing import Union, Optional, List, Type, Any +from dataclasses import field from uuid import UUID from cognee.shared.logging_utils import get_logger @@ -16,15 +17,16 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( reset_dataset_pipeline_run_status, ) from cognee.modules.engine.operations.setup import setup - -from cognee.tasks.memify.extract_subgraph import extract_subgraph from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor logger = get_logger("memify") async def memify( - tasks: List[Task], + preprocessing_tasks: List[Task], + processing_tasks: List[Task] = [], + postprocessing_tasks: List[Task] = [], + data: Optional[Any] = None, datasets: Union[str, list[str], list[UUID]] = None, user: User = None, node_type: Optional[Type] = NodeSet, @@ -55,16 +57,18 @@ async def memify( Use pipeline_run_id from return value to monitor progress. """ - if cypher_query: - pass - else: - memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) - # List of edges should be a single element in the list to represent one data item - data = [memory_fragment.edges] + if not data: + if cypher_query: + pass + else: + memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) + # Subgraphs should be a single element in the list to represent one data item + data = [memory_fragment] memify_tasks = [ - Task(extract_subgraph), - *tasks, # Unpack tasks provided to memify pipeline + *preprocessing_tasks, # Unpack tasks provided to memify pipeline + *processing_tasks, + *postprocessing_tasks, ] await setup() diff --git a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt index 31c9825bd..d9adf45f7 100644 --- a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt +++ b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt @@ -1,6 +1,6 @@ You are an association agent tasked with suggesting structured developer rules from user-agent interactions stored in a Knowledge Graph. You will receive the actual user agent interaction as a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet, and the list of the already existing developer rules. Each rule represents a single best practice or guideline the agent should follow in the future. -Suggest rules that are general and not specific to the current text, strictly technical, add value and improve the future agent behavior. +Suggest rules that are general and not specific to the knowledge graph relationships, strictly technical, add value and improve the future agent behavior. Do not suggest rules similar to the existing ones or rules that are not general and dont add value. It is acceptable to return an empty rule list. diff --git a/cognee/modules/graph/cognee_graph/CogneeGraph.py b/cognee/modules/graph/cognee_graph/CogneeGraph.py index 94a8e965e..924532ce0 100644 --- a/cognee/modules/graph/cognee_graph/CogneeGraph.py +++ b/cognee/modules/graph/cognee_graph/CogneeGraph.py @@ -188,72 +188,3 @@ class CogneeGraph(CogneeAbstractGraph): return n1 + n2 + e return heapq.nsmallest(k, self.edges, key=score) - - @staticmethod - async def resolve_edges_to_text(retrieved_edges: list) -> str: - """ - Converts retrieved graph edges into a human-readable string format. - - Parameters: - ----------- - - - retrieved_edges (list): A list of edges retrieved from the graph. - - Returns: - -------- - - - str: A formatted string representation of the nodes and their connections. - """ - - def _get_nodes(retrieved_edges: list) -> dict: - def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: - def _top_n_words(text, stop_words=None, top_n=3, separator=", "): - """Concatenates the top N frequent words in text.""" - if stop_words is None: - from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS - - stop_words = DEFAULT_STOP_WORDS - - import string - - words = [word.lower().strip(string.punctuation) for word in text.split()] - - if stop_words: - words = [word for word in words if word and word not in stop_words] - - from collections import Counter - - top_words = [word for word, freq in Counter(words).most_common(top_n)] - - return separator.join(top_words) - - """Creates a title, by combining first words with most frequent words from the text.""" - first_n_words = text.split()[:first_n_words] - top_n_words = _top_n_words(text, top_n=top_n_words) - return f"{' '.join(first_n_words)}... [{top_n_words}]" - - """Creates a dictionary of nodes with their names and content.""" - nodes = {} - for edge in retrieved_edges: - for node in (edge.node1, edge.node2): - if node.id not in nodes: - text = node.attributes.get("text") - if text: - name = _get_title(text) - content = text - else: - name = node.attributes.get("name", "Unnamed Node") - content = node.attributes.get("description", name) - nodes[node.id] = {"node": node, "name": name, "content": content} - return nodes - - nodes = _get_nodes(retrieved_edges) - node_section = "\n".join( - f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" - for info in nodes.values() - ) - connection_section = "\n".join( - f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" - for edge in retrieved_edges - ) - return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" diff --git a/cognee/modules/graph/utils/__init__.py b/cognee/modules/graph/utils/__init__.py index d1cda2d83..ebc648495 100644 --- a/cognee/modules/graph/utils/__init__.py +++ b/cognee/modules/graph/utils/__init__.py @@ -4,3 +4,4 @@ from .get_model_instance_from_graph import get_model_instance_from_graph from .retrieve_existing_edges import retrieve_existing_edges from .convert_node_to_data_point import convert_node_to_data_point from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges +from .resolve_edges_to_text import resolve_edges_to_text diff --git a/cognee/modules/graph/utils/resolve_edges_to_text.py b/cognee/modules/graph/utils/resolve_edges_to_text.py new file mode 100644 index 000000000..56c303abc --- /dev/null +++ b/cognee/modules/graph/utils/resolve_edges_to_text.py @@ -0,0 +1,67 @@ +async def resolve_edges_to_text(retrieved_edges: list) -> str: + """ + Converts retrieved graph edges into a human-readable string format. + + Parameters: + ----------- + + - retrieved_edges (list): A list of edges retrieved from the graph. + + Returns: + -------- + + - str: A formatted string representation of the nodes and their connections. + """ + + def _get_nodes(retrieved_edges: list) -> dict: + def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: + def _top_n_words(text, stop_words=None, top_n=3, separator=", "): + """Concatenates the top N frequent words in text.""" + if stop_words is None: + from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS + + stop_words = DEFAULT_STOP_WORDS + + import string + + words = [word.lower().strip(string.punctuation) for word in text.split()] + + if stop_words: + words = [word for word in words if word and word not in stop_words] + + from collections import Counter + + top_words = [word for word, freq in Counter(words).most_common(top_n)] + + return separator.join(top_words) + + """Creates a title, by combining first words with most frequent words from the text.""" + first_n_words = text.split()[:first_n_words] + top_n_words = _top_n_words(text, top_n=top_n_words) + return f"{' '.join(first_n_words)}... [{top_n_words}]" + + """Creates a dictionary of nodes with their names and content.""" + nodes = {} + for edge in retrieved_edges: + for node in (edge.node1, edge.node2): + if node.id not in nodes: + text = node.attributes.get("text") + if text: + name = _get_title(text) + content = text + else: + name = node.attributes.get("name", "Unnamed Node") + content = node.attributes.get("description", name) + nodes[node.id] = {"node": node, "name": name, "content": content} + return nodes + + nodes = _get_nodes(retrieved_edges) + node_section = "\n".join( + f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" + for info in nodes.values() + ) + connection_section = "\n".join( + f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" + for edge in retrieved_edges + ) + return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" diff --git a/cognee/modules/retrieval/graph_completion_retriever.py b/cognee/modules/retrieval/graph_completion_retriever.py index 6a5193c56..bc4fa27b3 100644 --- a/cognee/modules/retrieval/graph_completion_retriever.py +++ b/cognee/modules/retrieval/graph_completion_retriever.py @@ -5,6 +5,7 @@ import string from cognee.infrastructure.engine import DataPoint from cognee.tasks.storage import add_data_points +from cognee.modules.graph.utils import resolve_edges_to_text from cognee.modules.graph.utils.convert_node_to_data_point import get_all_subclasses from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.utils.brute_force_triplet_search import brute_force_triplet_search @@ -53,22 +54,6 @@ class GraphCompletionRetriever(BaseRetriever): self.node_type = node_type self.node_name = node_name - def _get_nodes(self, retrieved_edges: list) -> dict: - """Creates a dictionary of nodes with their names and content.""" - nodes = {} - for edge in retrieved_edges: - for node in (edge.node1, edge.node2): - if node.id not in nodes: - text = node.attributes.get("text") - if text: - name = self._get_title(text) - content = text - else: - name = node.attributes.get("name", "Unnamed Node") - content = node.attributes.get("description", name) - nodes[node.id] = {"node": node, "name": name, "content": content} - return nodes - async def resolve_edges_to_text(self, retrieved_edges: list) -> str: """ Converts retrieved graph edges into a human-readable string format. @@ -83,16 +68,7 @@ class GraphCompletionRetriever(BaseRetriever): - str: A formatted string representation of the nodes and their connections. """ - nodes = self._get_nodes(retrieved_edges) - node_section = "\n".join( - f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" - for info in nodes.values() - ) - connection_section = "\n".join( - f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" - for edge in retrieved_edges - ) - return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" + return await resolve_edges_to_text(retrieved_edges) async def get_triplets(self, query: str) -> list: """ @@ -196,26 +172,6 @@ class GraphCompletionRetriever(BaseRetriever): return [completion] - def _top_n_words(self, text, stop_words=None, top_n=3, separator=", "): - """Concatenates the top N frequent words in text.""" - if stop_words is None: - stop_words = DEFAULT_STOP_WORDS - - words = [word.lower().strip(string.punctuation) for word in text.split()] - - if stop_words: - words = [word for word in words if word and word not in stop_words] - - top_words = [word for word, freq in Counter(words).most_common(top_n)] - - return separator.join(top_words) - - def _get_title(self, text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: - """Creates a title, by combining first words with most frequent words from the text.""" - first_n_words = text.split()[:first_n_words] - top_n_words = self._top_n_words(text, top_n=top_n_words) - return f"{' '.join(first_n_words)}... [{top_n_words}]" - async def save_qa(self, question: str, answer: str, context: str, triplets: List) -> None: """ Saves a question and answer pair for later analysis or storage. diff --git a/cognee/tasks/codingagents/coding_rule_associations.py b/cognee/tasks/codingagents/coding_rule_associations.py index 6971ecc83..e722e7728 100644 --- a/cognee/tasks/codingagents/coding_rule_associations.py +++ b/cognee/tasks/codingagents/coding_rule_associations.py @@ -96,10 +96,14 @@ async def add_rule_associations( user_prompt_location: str = "coding_rule_association_agent_user.txt", system_prompt_location: str = "coding_rule_association_agent_system.txt", ): + if isinstance(data, list): + # If data is a list of strings join all strings in list + data = " ".join(data) + graph_engine = await get_graph_engine() existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name) - user_context = {"user data": data, "rules": existing_rules} + user_context = {"chat": data, "rules": existing_rules} user_prompt = LLMGateway.render_prompt(user_prompt_location, context=user_context) system_prompt = LLMGateway.render_prompt(system_prompt_location, context={}) diff --git a/cognee/tasks/memify/__init__.py b/cognee/tasks/memify/__init__.py index d2e0172f6..692bac443 100644 --- a/cognee/tasks/memify/__init__.py +++ b/cognee/tasks/memify/__init__.py @@ -1 +1,2 @@ from .extract_subgraph import extract_subgraph +from .extract_subgraph_chunks import extract_subgraph_chunks diff --git a/cognee/tasks/memify/extract_subgraph.py b/cognee/tasks/memify/extract_subgraph.py index 198a5b367..d6ca3773f 100644 --- a/cognee/tasks/memify/extract_subgraph.py +++ b/cognee/tasks/memify/extract_subgraph.py @@ -1,7 +1,7 @@ -from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph -async def extract_subgraph(subgraphs): +async def extract_subgraph(subgraphs: list[CogneeGraph]): for subgraph in subgraphs: - for edge in subgraph: + for edge in subgraph.edges: yield edge diff --git a/cognee/tasks/memify/extract_subgraph_chunks.py b/cognee/tasks/memify/extract_subgraph_chunks.py new file mode 100644 index 000000000..9aab498d7 --- /dev/null +++ b/cognee/tasks/memify/extract_subgraph_chunks.py @@ -0,0 +1,11 @@ +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + + +async def extract_subgraph_chunks(subgraphs: list[CogneeGraph]): + """ + Get all Document Chunks from subgraphs and forward to next task in pipeline + """ + for subgraph in subgraphs: + for node in subgraph.nodes.values(): + if node.attributes["type"] == "DocumentChunk": + yield node.attributes["text"] diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py index 70064c346..004a840f8 100644 --- a/examples/python/memify_coding_agent_example.py +++ b/examples/python/memify_coding_agent_example.py @@ -1,7 +1,18 @@ import asyncio +import pathlib +import os + import cognee +from cognee.api.v1.visualize.visualize import visualize_graph from cognee.shared.logging_utils import setup_logging, ERROR -from cognee.api.v1.search import SearchType +from cognee.api.v1.cognify.memify import memify +from cognee.modules.pipelines.tasks.task import Task +from cognee.tasks.memify.extract_subgraph import extract_subgraph +from cognee.modules.graph.utils import resolve_edges_to_text +from cognee.tasks.codingagents.coding_rule_associations import ( + add_rule_associations, + get_existing_rules, +) # Prerequisites: # 1. Copy `.env.template` and rename it to `.env`. @@ -38,14 +49,10 @@ async def main(): await cognee.cognify() print("Cognify process complete.\n") - from cognee.api.v1.cognify.memify import memify + subgraph_extraction_tasks = [Task(extract_subgraph)] - from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph - from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations - from cognee.modules.pipelines.tasks.task import Task - - memify_tasks = [ - Task(CogneeGraph.resolve_edges_to_text, task_config={"batch_size": 10}), + rule_association_tasks = [ + Task(resolve_edges_to_text, task_config={"batch_size": 10}), Task( add_rule_associations, rules_nodeset_name="coding_agent_rules", @@ -54,11 +61,14 @@ async def main(): ), ] - await memify(tasks=memify_tasks, node_name=["coding_rules"]) + await memify( + preprocessing_tasks=subgraph_extraction_tasks, + processing_tasks=rule_association_tasks, + node_name=["coding_rules"], + ) - import os - import pathlib - from cognee.api.v1.visualize.visualize import visualize_graph + developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") + print(developer_rules) file_path = os.path.join( pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html" diff --git a/examples/python/memify_coding_agent_example_chunks.py b/examples/python/memify_coding_agent_example_chunks.py new file mode 100644 index 000000000..b07bcb815 --- /dev/null +++ b/examples/python/memify_coding_agent_example_chunks.py @@ -0,0 +1,106 @@ +import asyncio +import pathlib +import os + +import cognee +from cognee.api.v1.visualize.visualize import visualize_graph +from cognee.shared.logging_utils import setup_logging, ERROR +from cognee.api.v1.cognify.memify import memify +from cognee.modules.pipelines.tasks.task import Task +from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks +from cognee.tasks.codingagents.coding_rule_associations import ( + add_rule_associations, + get_existing_rules, +) + +# Prerequisites: +# 1. Copy `.env.template` and rename it to `.env`. +# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field: +# LLM_API_KEY = "your_key_here" + + +async def main(): + # Create a clean slate for cognee -- reset data and system state + print("Resetting cognee data...") + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + print("Data reset complete.\n") + print("Adding conversation about rules to cognee:\n") + + coding_rules_chat_from_principal_engineer = """ + We want code to be formatted by PEP8 standards. + Typing and Docstrings must be added. + Please also make sure to write NOTE: on all more complex code segments. + If there is any duplicate code, try to handle it in one function to avoid code duplication. + Susan should also always review new code changes before merging to main. + New releases should not happen on Friday so we don't have to fix them during the weekend. + """ + print( + f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}" + ) + + coding_rules_chat_from_manager = """ + Susan should always review new code changes before merging to main. + New releases should not happen on Friday so we don't have to fix them during the weekend. + """ + print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}") + + # Add the text, and make it available for cognify + await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager]) + print("Text added successfully.\n") + + # Use LLMs and cognee to create knowledge graph + await cognee.cognify() + print("Cognify process complete.\n") + + # Visualize graph after cognification + file_path = os.path.join( + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html" + ) + await visualize_graph(file_path) + print(f"Open file to see graph visualization only after cognification: {file_path}") + + # After graph is created, create a second pipeline that will go through the graph and enchance it with specific + # coding rule nodes + + # extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify) + subgraph_extraction_tasks = [Task(extract_subgraph_chunks)] + + # add_rule_associations is a function that handles processing coding rules from chunks and keeps track of + # existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created + # in the graph that specify coding rules found in conversations. + coding_rules_association_tasks = [ + Task( + add_rule_associations, + rules_nodeset_name="coding_agent_rules", + task_config={"batch_size": 1}, + ), + ] + + # Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified). + # If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead + await memify( + preprocessing_tasks=subgraph_extraction_tasks, + processing_tasks=coding_rules_association_tasks, + ) + + # Find the new specific coding rules added to graph through memify (created based on chat conversation between team members) + developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") + print(developer_rules) + + # Visualize new graph with added memify context + file_path = os.path.join( + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html" + ) + await visualize_graph(file_path) + print(f"Open file to see graph visualization after memify enhancment: {file_path}") + + +if __name__ == "__main__": + logger = setup_logging(log_level=ERROR) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + finally: + loop.run_until_complete(loop.shutdown_asyncgens()) From 90ef8c30d211bd8de3861063b0a2144cedeb2588 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 3 Sep 2025 16:16:55 +0200 Subject: [PATCH 05/14] refactor: Rename tasks --- cognee/api/v1/cognify/memify.py | 12 ++++++------ examples/python/memify_coding_agent_example.py | 4 ++-- .../python/memify_coding_agent_example_chunks.py | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py index 8237059ec..3c5f7be0f 100644 --- a/cognee/api/v1/cognify/memify.py +++ b/cognee/api/v1/cognify/memify.py @@ -23,9 +23,9 @@ logger = get_logger("memify") async def memify( - preprocessing_tasks: List[Task], - processing_tasks: List[Task] = [], - postprocessing_tasks: List[Task] = [], + data_streaming_tasks: List[Task], + data_processing_tasks: List[Task] = [], + data_persistence_tasks: List[Task] = [], data: Optional[Any] = None, datasets: Union[str, list[str], list[UUID]] = None, user: User = None, @@ -66,9 +66,9 @@ async def memify( data = [memory_fragment] memify_tasks = [ - *preprocessing_tasks, # Unpack tasks provided to memify pipeline - *processing_tasks, - *postprocessing_tasks, + *data_streaming_tasks, # Unpack tasks provided to memify pipeline + *data_processing_tasks, + *data_persistence_tasks, ] await setup() diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py index 004a840f8..c0bda215a 100644 --- a/examples/python/memify_coding_agent_example.py +++ b/examples/python/memify_coding_agent_example.py @@ -62,8 +62,8 @@ async def main(): ] await memify( - preprocessing_tasks=subgraph_extraction_tasks, - processing_tasks=rule_association_tasks, + data_streaming_tasks=subgraph_extraction_tasks, + data_processing_tasks=rule_association_tasks, node_name=["coding_rules"], ) diff --git a/examples/python/memify_coding_agent_example_chunks.py b/examples/python/memify_coding_agent_example_chunks.py index b07bcb815..639b97396 100644 --- a/examples/python/memify_coding_agent_example_chunks.py +++ b/examples/python/memify_coding_agent_example_chunks.py @@ -80,8 +80,8 @@ async def main(): # Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified). # If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead await memify( - preprocessing_tasks=subgraph_extraction_tasks, - processing_tasks=coding_rules_association_tasks, + data_streaming_tasks=subgraph_extraction_tasks, + data_processing_tasks=coding_rules_association_tasks, ) # Find the new specific coding rules added to graph through memify (created based on chat conversation between team members) From 0e3a10d925fffdb769b1e31fedd35a4460715aa7 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Wed, 3 Sep 2025 17:49:33 +0200 Subject: [PATCH 06/14] refactor: Change input task names --- cognee/api/v1/cognify/memify.py | 15 +-- ...y_coding_rule_association_agent_system.txt | 6 - ...ify_coding_rule_association_agent_user.txt | 6 - .../python/memify_coding_agent_example.py | 66 +++++++---- .../memify_coding_agent_example_chunks.py | 106 ------------------ 5 files changed, 51 insertions(+), 148 deletions(-) delete mode 100644 cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt delete mode 100644 cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt delete mode 100644 examples/python/memify_coding_agent_example_chunks.py diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py index 3c5f7be0f..86f84626a 100644 --- a/cognee/api/v1/cognify/memify.py +++ b/cognee/api/v1/cognify/memify.py @@ -1,5 +1,4 @@ from typing import Union, Optional, List, Type, Any -from dataclasses import field from uuid import UUID from cognee.shared.logging_utils import get_logger @@ -18,14 +17,17 @@ from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( ) from cognee.modules.engine.operations.setup import setup from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor +from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks +from cognee.tasks.codingagents.coding_rule_associations import ( + add_rule_associations, +) logger = get_logger("memify") async def memify( - data_streaming_tasks: List[Task], - data_processing_tasks: List[Task] = [], - data_persistence_tasks: List[Task] = [], + extraction_tasks: List[Task] = [Task(extract_subgraph_chunks)], + enrichment_tasks: List[Task] = [Task(add_rule_associations)], data: Optional[Any] = None, datasets: Union[str, list[str], list[UUID]] = None, user: User = None, @@ -66,9 +68,8 @@ async def memify( data = [memory_fragment] memify_tasks = [ - *data_streaming_tasks, # Unpack tasks provided to memify pipeline - *data_processing_tasks, - *data_persistence_tasks, + *extraction_tasks, # Unpack tasks provided to memify pipeline + *enrichment_tasks, ] await setup() diff --git a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt deleted file mode 100644 index d9adf45f7..000000000 --- a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_system.txt +++ /dev/null @@ -1,6 +0,0 @@ -You are an association agent tasked with suggesting structured developer rules from user-agent interactions stored in a Knowledge Graph. -You will receive the actual user agent interaction as a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet, and the list of the already existing developer rules. -Each rule represents a single best practice or guideline the agent should follow in the future. -Suggest rules that are general and not specific to the knowledge graph relationships, strictly technical, add value and improve the future agent behavior. -Do not suggest rules similar to the existing ones or rules that are not general and dont add value. -It is acceptable to return an empty rule list. diff --git a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt b/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt deleted file mode 100644 index 9b525c625..000000000 --- a/cognee/infrastructure/llm/prompts/memify_coding_rule_association_agent_user.txt +++ /dev/null @@ -1,6 +0,0 @@ -**Here is the User-agent interaction context provided with a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet:** -`{{ chat }}` - - -**Already existing rules:** -`{{ rules }}` diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py index c0bda215a..61af467d3 100644 --- a/examples/python/memify_coding_agent_example.py +++ b/examples/python/memify_coding_agent_example.py @@ -7,8 +7,7 @@ from cognee.api.v1.visualize.visualize import visualize_graph from cognee.shared.logging_utils import setup_logging, ERROR from cognee.api.v1.cognify.memify import memify from cognee.modules.pipelines.tasks.task import Task -from cognee.tasks.memify.extract_subgraph import extract_subgraph -from cognee.modules.graph.utils import resolve_edges_to_text +from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks from cognee.tasks.codingagents.coding_rule_associations import ( add_rule_associations, get_existing_rules, @@ -26,54 +25,75 @@ async def main(): await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) print("Data reset complete.\n") + print("Adding conversation about rules to cognee:\n") - # cognee knowledge graph will be created based on this text - text = """ - Natural language processing (NLP) is an interdisciplinary - subfield of computer science and information retrieval. - """ - - coding_rules_text = """ - Code must be formatted by PEP8 standards. + coding_rules_chat_from_principal_engineer = """ + We want code to be formatted by PEP8 standards. Typing and Docstrings must be added. + Please also make sure to write NOTE: on all more complex code segments. + If there is any duplicate code, try to handle it in one function to avoid code duplication. + Susan should also always review new code changes before merging to main. + New releases should not happen on Friday so we don't have to fix them during the weekend. """ + print( + f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}" + ) + + coding_rules_chat_from_manager = """ + Susan should always review new code changes before merging to main. + New releases should not happen on Friday so we don't have to fix them during the weekend. + """ + print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}") - print("Adding text to cognee:") - print(text.strip()) # Add the text, and make it available for cognify - await cognee.add(text) - await cognee.add(coding_rules_text, node_set=["coding_rules"]) + await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager]) print("Text added successfully.\n") # Use LLMs and cognee to create knowledge graph await cognee.cognify() print("Cognify process complete.\n") - subgraph_extraction_tasks = [Task(extract_subgraph)] + # Visualize graph after cognification + file_path = os.path.join( + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html" + ) + await visualize_graph(file_path) + print(f"Open file to see graph visualization only after cognification: {file_path}") - rule_association_tasks = [ - Task(resolve_edges_to_text, task_config={"batch_size": 10}), + # After graph is created, create a second pipeline that will go through the graph and enchance it with specific + # coding rule nodes + + # extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify) + subgraph_extraction_tasks = [Task(extract_subgraph_chunks)] + + # add_rule_associations is a function that handles processing coding rules from chunks and keeps track of + # existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created + # in the graph that specify coding rules found in conversations. + coding_rules_association_tasks = [ Task( add_rule_associations, rules_nodeset_name="coding_agent_rules", - user_prompt_location="memify_coding_rule_association_agent_user.txt", - system_prompt_location="memify_coding_rule_association_agent_system.txt", + task_config={"batch_size": 1}, ), ] + # Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified). + # If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead await memify( - data_streaming_tasks=subgraph_extraction_tasks, - data_processing_tasks=rule_association_tasks, - node_name=["coding_rules"], + extraction_tasks=subgraph_extraction_tasks, + enrichment_tasks=coding_rules_association_tasks, ) + # Find the new specific coding rules added to graph through memify (created based on chat conversation between team members) developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") print(developer_rules) + # Visualize new graph with added memify context file_path = os.path.join( - pathlib.Path(__file__).parent, ".artifacts", "graph_visualization.html" + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html" ) await visualize_graph(file_path) + print(f"Open file to see graph visualization after memify enhancment: {file_path}") if __name__ == "__main__": diff --git a/examples/python/memify_coding_agent_example_chunks.py b/examples/python/memify_coding_agent_example_chunks.py deleted file mode 100644 index 639b97396..000000000 --- a/examples/python/memify_coding_agent_example_chunks.py +++ /dev/null @@ -1,106 +0,0 @@ -import asyncio -import pathlib -import os - -import cognee -from cognee.api.v1.visualize.visualize import visualize_graph -from cognee.shared.logging_utils import setup_logging, ERROR -from cognee.api.v1.cognify.memify import memify -from cognee.modules.pipelines.tasks.task import Task -from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks -from cognee.tasks.codingagents.coding_rule_associations import ( - add_rule_associations, - get_existing_rules, -) - -# Prerequisites: -# 1. Copy `.env.template` and rename it to `.env`. -# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field: -# LLM_API_KEY = "your_key_here" - - -async def main(): - # Create a clean slate for cognee -- reset data and system state - print("Resetting cognee data...") - await cognee.prune.prune_data() - await cognee.prune.prune_system(metadata=True) - print("Data reset complete.\n") - print("Adding conversation about rules to cognee:\n") - - coding_rules_chat_from_principal_engineer = """ - We want code to be formatted by PEP8 standards. - Typing and Docstrings must be added. - Please also make sure to write NOTE: on all more complex code segments. - If there is any duplicate code, try to handle it in one function to avoid code duplication. - Susan should also always review new code changes before merging to main. - New releases should not happen on Friday so we don't have to fix them during the weekend. - """ - print( - f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}" - ) - - coding_rules_chat_from_manager = """ - Susan should always review new code changes before merging to main. - New releases should not happen on Friday so we don't have to fix them during the weekend. - """ - print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}") - - # Add the text, and make it available for cognify - await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager]) - print("Text added successfully.\n") - - # Use LLMs and cognee to create knowledge graph - await cognee.cognify() - print("Cognify process complete.\n") - - # Visualize graph after cognification - file_path = os.path.join( - pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html" - ) - await visualize_graph(file_path) - print(f"Open file to see graph visualization only after cognification: {file_path}") - - # After graph is created, create a second pipeline that will go through the graph and enchance it with specific - # coding rule nodes - - # extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify) - subgraph_extraction_tasks = [Task(extract_subgraph_chunks)] - - # add_rule_associations is a function that handles processing coding rules from chunks and keeps track of - # existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created - # in the graph that specify coding rules found in conversations. - coding_rules_association_tasks = [ - Task( - add_rule_associations, - rules_nodeset_name="coding_agent_rules", - task_config={"batch_size": 1}, - ), - ] - - # Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified). - # If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead - await memify( - data_streaming_tasks=subgraph_extraction_tasks, - data_processing_tasks=coding_rules_association_tasks, - ) - - # Find the new specific coding rules added to graph through memify (created based on chat conversation between team members) - developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") - print(developer_rules) - - # Visualize new graph with added memify context - file_path = os.path.join( - pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html" - ) - await visualize_graph(file_path) - print(f"Open file to see graph visualization after memify enhancment: {file_path}") - - -if __name__ == "__main__": - logger = setup_logging(log_level=ERROR) - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - try: - loop.run_until_complete(main()) - finally: - loop.run_until_complete(loop.shutdown_asyncgens()) From 3c50ef4d6f8e94a7c6edde0e00b66738705fe83a Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 4 Sep 2025 14:44:13 +0200 Subject: [PATCH 07/14] docs: Update docstring for memify --- cognee/api/v1/cognify/memify.py | 29 +++++++++++++---------------- 1 file changed, 13 insertions(+), 16 deletions(-) diff --git a/cognee/api/v1/cognify/memify.py b/cognee/api/v1/cognify/memify.py index 86f84626a..dd089c060 100644 --- a/cognee/api/v1/cognify/memify.py +++ b/cognee/api/v1/cognify/memify.py @@ -33,24 +33,24 @@ async def memify( user: User = None, node_type: Optional[Type] = NodeSet, node_name: Optional[List[str]] = None, - cypher_query: Optional[str] = None, - vector_db_config: dict = None, - graph_db_config: dict = None, + vector_db_config: Optional[dict] = None, + graph_db_config: Optional[dict] = None, run_in_background: bool = False, ): """ - Prerequisites: - - **LLM_API_KEY**: Must be configured (required for entity extraction and graph generation) - - **Data Added**: Must have data previously added via `cognee.add()` and `cognee.cognify()` - - **Vector Database**: Must be accessible for embeddings storage - - **Graph Database**: Must be accessible for relationship storage - Args: - datasets: Dataset name(s) or dataset uuid to process. Processes all available data if None. + extraction_tasks: List of Cognee Tasks to execute for graph/data extraction. + enrichment_tasks: List of Cognee Tasks to handle enrichment of provided graph/data from extraction tasks. + data: The data to ingest. Can be anything when custom extraction and enrichment tasks are used. + Data provided here will be forwarded to the first extraction task in the pipeline as input. + If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded + datasets: Dataset name(s) or dataset uuid to process. Processes all available datasets if None. - Single dataset: "my_dataset" - Multiple datasets: ["docs", "research", "reports"] - None: Process all datasets for the user user: User context for authentication and data access. Uses default if None. + node_type: Filter graph to specific entity types (for advanced filtering). Used when no data is provided. + node_name: Filter graph to specific named entities (for targeted search). Used when no data is provided. vector_db_config: Custom vector database configuration for embeddings storage. graph_db_config: Custom graph database configuration for relationship storage. run_in_background: If True, starts processing asynchronously and returns immediately. @@ -60,12 +60,9 @@ async def memify( """ if not data: - if cypher_query: - pass - else: - memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) - # Subgraphs should be a single element in the list to represent one data item - data = [memory_fragment] + memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) + # Subgraphs should be a single element in the list to represent one data item + data = [memory_fragment] memify_tasks = [ *extraction_tasks, # Unpack tasks provided to memify pipeline From c1106b76fe140f9ed4588a50ff3914e4ef2a2778 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 4 Sep 2025 17:53:07 +0200 Subject: [PATCH 08/14] feat: Added new coding rules search --- .../retrieval/coding_rules_retriever.py | 19 +++++++++++++++++++ cognee/modules/search/methods/search.py | 4 ++++ cognee/modules/search/types/SearchType.py | 1 + .../codingagents/coding_rule_associations.py | 5 +++-- .../python/memify_coding_agent_example.py | 9 +++++++-- 5 files changed, 34 insertions(+), 4 deletions(-) create mode 100644 cognee/modules/retrieval/coding_rules_retriever.py diff --git a/cognee/modules/retrieval/coding_rules_retriever.py b/cognee/modules/retrieval/coding_rules_retriever.py new file mode 100644 index 000000000..2578d1ee1 --- /dev/null +++ b/cognee/modules/retrieval/coding_rules_retriever.py @@ -0,0 +1,19 @@ +from cognee.shared.logging_utils import get_logger +from cognee.tasks.codingagents.coding_rule_associations import get_existing_rules + +logger = get_logger("CodingRulesRetriever") + + +class CodingRulesRetriever: + """Retriever for handling codeing rule based searches.""" + + def __init__(self, rules_nodeset_name): + if isinstance(rules_nodeset_name, list): + rules_nodeset_name = rules_nodeset_name[0] + self.rules_nodeset_name = rules_nodeset_name + """Initialize retriever with search parameters.""" + + async def get_existing_rules(self, query_text): + return await get_existing_rules( + rules_nodeset_name=self.rules_nodeset_name, return_list=True + ) diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index 71bf61d6b..b341e4a8a 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -13,6 +13,7 @@ from cognee.modules.retrieval.insights_retriever import InsightsRetriever from cognee.modules.retrieval.summaries_retriever import SummariesRetriever from cognee.modules.retrieval.completion_retriever import CompletionRetriever from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever +from cognee.modules.retrieval.coding_rules_retriever import CodingRulesRetriever from cognee.modules.retrieval.graph_summary_completion_retriever import ( GraphSummaryCompletionRetriever, ) @@ -167,6 +168,9 @@ async def specific_search( SearchType.CYPHER: CypherSearchRetriever().get_completion, SearchType.NATURAL_LANGUAGE: NaturalLanguageRetriever().get_completion, SearchType.FEEDBACK: UserQAFeedback(last_k=last_k).add_feedback, + SearchType.CODING_RULES: CodingRulesRetriever( + rules_nodeset_name=node_name + ).get_existing_rules, } # If the query type is FEELING_LUCKY, select the search type intelligently diff --git a/cognee/modules/search/types/SearchType.py b/cognee/modules/search/types/SearchType.py index c1f0521b2..0a7cae63a 100644 --- a/cognee/modules/search/types/SearchType.py +++ b/cognee/modules/search/types/SearchType.py @@ -15,3 +15,4 @@ class SearchType(Enum): GRAPH_COMPLETION_CONTEXT_EXTENSION = "GRAPH_COMPLETION_CONTEXT_EXTENSION" FEELING_LUCKY = "FEELING_LUCKY" FEEDBACK = "FEEDBACK" + CODING_RULES = "CODING_RULES" diff --git a/cognee/tasks/codingagents/coding_rule_associations.py b/cognee/tasks/codingagents/coding_rule_associations.py index e722e7728..c809bc68f 100644 --- a/cognee/tasks/codingagents/coding_rule_associations.py +++ b/cognee/tasks/codingagents/coding_rule_associations.py @@ -31,7 +31,7 @@ class RuleSet(DataPoint): ) -async def get_existing_rules(rules_nodeset_name: str) -> str: +async def get_existing_rules(rules_nodeset_name: str, return_list: bool = False) -> str: graph_engine = await get_graph_engine() nodes_data, _ = await graph_engine.get_nodeset_subgraph( node_type=NodeSet, node_name=[rules_nodeset_name] @@ -46,7 +46,8 @@ async def get_existing_rules(rules_nodeset_name: str) -> str: and "text" in item[1] ] - existing_rules = "\n".join(f"- {rule}" for rule in existing_rules) + if not return_list: + existing_rules = "\n".join(f"- {rule}" for rule in existing_rules) return existing_rules diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py index 61af467d3..7f8c58802 100644 --- a/examples/python/memify_coding_agent_example.py +++ b/examples/python/memify_coding_agent_example.py @@ -85,8 +85,13 @@ async def main(): ) # Find the new specific coding rules added to graph through memify (created based on chat conversation between team members) - developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") - print(developer_rules) + print( + await cognee.search( + query_text="List me the coding rules", + query_type=cognee.SearchType.CODING_RULES, + node_name=["coding_agent_rules"], + ) + ) # Visualize new graph with added memify context file_path = os.path.join( From 95bafd942c8b0553f45a7987492ac8cbf6e5ad86 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 4 Sep 2025 18:06:02 +0200 Subject: [PATCH 09/14] feat: add coding rule search type --- examples/python/memify_coding_agent_example.py | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py index 7f8c58802..0238cf775 100644 --- a/examples/python/memify_coding_agent_example.py +++ b/examples/python/memify_coding_agent_example.py @@ -85,14 +85,15 @@ async def main(): ) # Find the new specific coding rules added to graph through memify (created based on chat conversation between team members) - print( - await cognee.search( - query_text="List me the coding rules", - query_type=cognee.SearchType.CODING_RULES, - node_name=["coding_agent_rules"], - ) + coding_rules = await cognee.search( + query_text="List me the coding rules", + query_type=cognee.SearchType.CODING_RULES, + node_name=["coding_agent_rules"], ) + for coding_rule in coding_rules: + print(coding_rule) + # Visualize new graph with added memify context file_path = os.path.join( pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html" From b0d4503f2b3252e1d8c56ec98644d72c219abb31 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 4 Sep 2025 18:12:59 +0200 Subject: [PATCH 10/14] refactor: Move memify our of api folder --- cognee/__init__.py | 1 + cognee/modules/memify/__init__.py | 1 + cognee/{api/v1/cognify => modules/memify}/memify.py | 0 examples/python/memify_coding_agent_example.py | 7 ++----- 4 files changed, 4 insertions(+), 5 deletions(-) create mode 100644 cognee/modules/memify/__init__.py rename cognee/{api/v1/cognify => modules/memify}/memify.py (100%) diff --git a/cognee/__init__.py b/cognee/__init__.py index 7aa6388d9..be5a16b3b 100644 --- a/cognee/__init__.py +++ b/cognee/__init__.py @@ -18,6 +18,7 @@ logger = setup_logging() from .api.v1.add import add from .api.v1.delete import delete from .api.v1.cognify import cognify +from .modules.memify import memify from .api.v1.config.config import config from .api.v1.datasets.datasets import datasets from .api.v1.prune import prune diff --git a/cognee/modules/memify/__init__.py b/cognee/modules/memify/__init__.py new file mode 100644 index 000000000..90aaa8404 --- /dev/null +++ b/cognee/modules/memify/__init__.py @@ -0,0 +1 @@ +from .memify import memify diff --git a/cognee/api/v1/cognify/memify.py b/cognee/modules/memify/memify.py similarity index 100% rename from cognee/api/v1/cognify/memify.py rename to cognee/modules/memify/memify.py diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py index 0238cf775..17bf8fc0e 100644 --- a/examples/python/memify_coding_agent_example.py +++ b/examples/python/memify_coding_agent_example.py @@ -3,15 +3,12 @@ import pathlib import os import cognee +from cognee import memify from cognee.api.v1.visualize.visualize import visualize_graph from cognee.shared.logging_utils import setup_logging, ERROR -from cognee.api.v1.cognify.memify import memify from cognee.modules.pipelines.tasks.task import Task from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks -from cognee.tasks.codingagents.coding_rule_associations import ( - add_rule_associations, - get_existing_rules, -) +from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations # Prerequisites: # 1. Copy `.env.template` and rename it to `.env`. From 805f443cd6e88e6a9ae68f3ddaa2594982488c65 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 4 Sep 2025 19:08:55 +0200 Subject: [PATCH 11/14] feat: Add memify router --- cognee/api/client.py | 3 + cognee/api/v1/add/routers/get_add_router.py | 6 -- cognee/api/v1/memify/__init__.py | 0 cognee/api/v1/memify/routers/__init__.py | 1 + .../v1/memify/routers/get_memify_router.py | 99 +++++++++++++++++++ cognee/modules/memify/memify.py | 13 ++- .../python/memify_coding_agent_example.py | 7 +- 7 files changed, 118 insertions(+), 11 deletions(-) create mode 100644 cognee/api/v1/memify/__init__.py create mode 100644 cognee/api/v1/memify/routers/__init__.py create mode 100644 cognee/api/v1/memify/routers/get_memify_router.py diff --git a/cognee/api/client.py b/cognee/api/client.py index 215e4a17e..d6bd71d5f 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -22,6 +22,7 @@ from cognee.api.v1.settings.routers import get_settings_router from cognee.api.v1.datasets.routers import get_datasets_router from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_router from cognee.api.v1.search.routers import get_search_router +from cognee.api.v1.memify.routers import get_memify_router from cognee.api.v1.add.routers import get_add_router from cognee.api.v1.delete.routers import get_delete_router from cognee.api.v1.responses.routers import get_responses_router @@ -230,6 +231,8 @@ app.include_router(get_add_router(), prefix="/api/v1/add", tags=["add"]) app.include_router(get_cognify_router(), prefix="/api/v1/cognify", tags=["cognify"]) +app.include_router(get_memify_router(), prefix="/api/v1/memify", tags=["memify"]) + app.include_router(get_search_router(), prefix="/api/v1/search", tags=["search"]) app.include_router( diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index 1703d9931..9de818b7d 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -1,6 +1,3 @@ -import os -import requests -import subprocess from uuid import UUID from fastapi import APIRouter @@ -60,9 +57,6 @@ def get_add_router() -> APIRouter: ## Notes - To add data to datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True) - - GitHub repositories are cloned and all files are processed - - HTTP URLs are fetched and their content is processed - - The ALLOW_HTTP_REQUESTS environment variable controls URL processing - datasetId value can only be the UUID of an already existing dataset """ send_telemetry( diff --git a/cognee/api/v1/memify/__init__.py b/cognee/api/v1/memify/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/api/v1/memify/routers/__init__.py b/cognee/api/v1/memify/routers/__init__.py new file mode 100644 index 000000000..1d1793c35 --- /dev/null +++ b/cognee/api/v1/memify/routers/__init__.py @@ -0,0 +1 @@ +from .get_memify_router import get_memify_router diff --git a/cognee/api/v1/memify/routers/get_memify_router.py b/cognee/api/v1/memify/routers/get_memify_router.py new file mode 100644 index 000000000..edac2775a --- /dev/null +++ b/cognee/api/v1/memify/routers/get_memify_router.py @@ -0,0 +1,99 @@ +from uuid import UUID + +from fastapi import APIRouter +from fastapi.responses import JSONResponse +from fastapi import Depends +from pydantic import Field +from typing import List, Optional + +from cognee.api.DTO import InDTO +from cognee.modules.users.models import User +from cognee.modules.users.methods import get_authenticated_user +from cognee.shared.utils import send_telemetry +from cognee.modules.pipelines.models import PipelineRunErrored +from cognee.shared.logging_utils import get_logger + +logger = get_logger() + + +class MemifyPayloadDTO(InDTO): + extraction_tasks: List[str] = Field( + default=None, + examples=[[]], + ) + enrichment_tasks: List[str] = (Field(default=None, examples=[[]]),) + data: Optional[str] = (Field(default=None),) + dataset_names: Optional[List[str]] = Field(default=None) + dataset_ids: Optional[List[UUID]] = Field(default=None, examples=[[]]) + node_name: Optional[List[str]] = Field(default=None) + run_in_background: Optional[bool] = Field(default=False) + + +def get_memify_router() -> APIRouter: + router = APIRouter() + + @router.post("", response_model=dict) + async def memify(payload: MemifyPayloadDTO, user: User = Depends(get_authenticated_user)): + """ + Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data, + custom data can also be provided instead which can be processed with provided extraction and enrichment tasks. + + Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation. + + ## Request Parameters + - **extractionTasks** Optional[List[str]]: List of available Cognee Tasks to execute for graph/data extraction. + - **enrichmentTasks** Optional[List[str]]: List of available Cognee Tasks to handle enrichment of provided graph/data from extraction tasks. + - **data** Optional[List[str]]: The data to ingest. Can be any text data when custom extraction and enrichment tasks are used. + Data provided here will be forwarded to the first extraction task in the pipeline as input. + If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded + - **dataset_names** (Optional[List[str]]): Name of the datasets to memify + - **dataset_ids** (Optional[List[UUID]]): List of UUIDs of an already existing dataset + - **node_name** (Optional[List[str]]): Filter graph to specific named entities (for targeted search). Used when no data is provided. + - **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking). + + Either datasetName or datasetId must be provided. + + ## Response + Returns information about the add operation containing: + - Status of the operation + - Details about the processed data + - Any relevant metadata from the ingestion process + + ## Error Codes + - **400 Bad Request**: Neither datasetId nor datasetName provided + - **409 Conflict**: Error during memify operation + - **403 Forbidden**: User doesn't have permission to use dataset + + ## Notes + - To memify datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True) + - datasetId value can only be the UUID of an already existing dataset + """ + + send_telemetry( + "Memify API Endpoint Invoked", + user.id, + additional_properties={"endpoint": "POST /v1/memify"}, + ) + + if not payload.dataset_ids and not payload.dataset_names: + raise ValueError("Either datasetId or datasetName must be provided.") + + from cognee import memify + + try: + memify_run = await memify( + extraction_tasks=payload.extraction_tasks, + enrichment_tasks=payload.enrichment_tasks, + data=payload.data, + datasets=payload.dataset_ids if payload.dataset_ids else payload.dataset_names, + node_name=payload.node_name, + user=user, + ) + + if isinstance(memify_run, PipelineRunErrored): + return JSONResponse(status_code=420, content=memify_run) + return memify_run + except Exception as error: + return JSONResponse(status_code=409, content={"error": str(error)}) + + return router diff --git a/cognee/modules/memify/memify.py b/cognee/modules/memify/memify.py index dd089c060..80afd7325 100644 --- a/cognee/modules/memify/memify.py +++ b/cognee/modules/memify/memify.py @@ -26,8 +26,8 @@ logger = get_logger("memify") async def memify( - extraction_tasks: List[Task] = [Task(extract_subgraph_chunks)], - enrichment_tasks: List[Task] = [Task(add_rule_associations)], + extraction_tasks: Union[List[Task], List[str]] = [Task(extract_subgraph_chunks)], + enrichment_tasks: Union[List[Task], List[str]] = [Task(add_rule_associations)], data: Optional[Any] = None, datasets: Union[str, list[str], list[UUID]] = None, user: User = None, @@ -38,6 +38,15 @@ async def memify( run_in_background: bool = False, ): """ + Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data, + custom data can also be provided instead which can be processed with provided extraction and enrichment tasks. + + Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation. + + This is the core processing step in Cognee that converts raw text and documents + into an intelligent knowledge graph. It analyzes content, extracts entities and + relationships, and creates semantic connections for enhanced search and reasoning. + Args: extraction_tasks: List of Cognee Tasks to execute for graph/data extraction. enrichment_tasks: List of Cognee Tasks to handle enrichment of provided graph/data from extraction tasks. diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py index 17bf8fc0e..1fd3b1528 100644 --- a/examples/python/memify_coding_agent_example.py +++ b/examples/python/memify_coding_agent_example.py @@ -55,7 +55,7 @@ async def main(): pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html" ) await visualize_graph(file_path) - print(f"Open file to see graph visualization only after cognification: {file_path}") + print(f"Open file to see graph visualization only after cognification: {file_path}\n") # After graph is created, create a second pipeline that will go through the graph and enchance it with specific # coding rule nodes @@ -88,15 +88,16 @@ async def main(): node_name=["coding_agent_rules"], ) + print("Coding rules created by memify:") for coding_rule in coding_rules: - print(coding_rule) + print("- " + coding_rule) # Visualize new graph with added memify context file_path = os.path.join( pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html" ) await visualize_graph(file_path) - print(f"Open file to see graph visualization after memify enhancment: {file_path}") + print(f"\nOpen file to see graph visualization after memify enhancment: {file_path}") if __name__ == "__main__": From e06cf11f49d2a574e0906d32dd022767a2d7cdd9 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 4 Sep 2025 19:53:59 +0200 Subject: [PATCH 12/14] fix: Resolve import issue with creating auth dataset --- cognee/api/v1/add/routers/get_add_router.py | 2 +- .../api/v1/memify/routers/get_memify_router.py | 16 ++++++++-------- .../data/methods/load_or_create_datasets.py | 2 +- cognee/modules/memify/memify.py | 16 ++++++++++++++-- .../modules/retrieval/coding_rules_retriever.py | 5 ++++- 5 files changed, 28 insertions(+), 13 deletions(-) diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index 9de818b7d..f27d559e1 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -21,7 +21,7 @@ def get_add_router() -> APIRouter: async def add( data: List[UploadFile] = File(default=None), datasetName: Optional[str] = Form(default=None), - datasetId: Union[UUID, Literal[""], None] = Form(default=None, examples=[""]), + datasetId: Union[UUID, None] = Form(default=None, examples=[""]), node_set: Optional[List[str]] = Form(default=[""], example=[""]), user: User = Depends(get_authenticated_user), ): diff --git a/cognee/api/v1/memify/routers/get_memify_router.py b/cognee/api/v1/memify/routers/get_memify_router.py index edac2775a..817eef9bd 100644 --- a/cognee/api/v1/memify/routers/get_memify_router.py +++ b/cognee/api/v1/memify/routers/get_memify_router.py @@ -17,15 +17,15 @@ logger = get_logger() class MemifyPayloadDTO(InDTO): - extraction_tasks: List[str] = Field( + extraction_tasks: Optional[List[str]] = Field( default=None, examples=[[]], ) - enrichment_tasks: List[str] = (Field(default=None, examples=[[]]),) - data: Optional[str] = (Field(default=None),) - dataset_names: Optional[List[str]] = Field(default=None) + enrichment_tasks: Optional[List[str]] = Field(default=None, examples=[[]]) + data: Optional[str] = Field(default="") + dataset_names: Optional[List[str]] = Field(default=None, examples=[[]]) dataset_ids: Optional[List[UUID]] = Field(default=None, examples=[[]]) - node_name: Optional[List[str]] = Field(default=None) + node_name: Optional[List[str]] = Field(default=None, examples=[[]]) run_in_background: Optional[bool] = Field(default=False) @@ -78,10 +78,10 @@ def get_memify_router() -> APIRouter: if not payload.dataset_ids and not payload.dataset_names: raise ValueError("Either datasetId or datasetName must be provided.") - from cognee import memify - try: - memify_run = await memify( + from cognee.modules.memify import memify as cognee_memify + + memify_run = await cognee_memify( extraction_tasks=payload.extraction_tasks, enrichment_tasks=payload.enrichment_tasks, data=payload.data, diff --git a/cognee/modules/data/methods/load_or_create_datasets.py b/cognee/modules/data/methods/load_or_create_datasets.py index 1d6ef3efb..2c9a6497c 100644 --- a/cognee/modules/data/methods/load_or_create_datasets.py +++ b/cognee/modules/data/methods/load_or_create_datasets.py @@ -2,7 +2,7 @@ from typing import List, Union from uuid import UUID from cognee.modules.data.models import Dataset -from cognee.modules.data.methods import create_authorized_dataset +from cognee.modules.data.methods.create_authorized_dataset import create_authorized_dataset from cognee.modules.data.exceptions import DatasetNotFoundError diff --git a/cognee/modules/memify/memify.py b/cognee/modules/memify/memify.py index 80afd7325..d8e1087f2 100644 --- a/cognee/modules/memify/memify.py +++ b/cognee/modules/memify/memify.py @@ -26,8 +26,8 @@ logger = get_logger("memify") async def memify( - extraction_tasks: Union[List[Task], List[str]] = [Task(extract_subgraph_chunks)], - enrichment_tasks: Union[List[Task], List[str]] = [Task(add_rule_associations)], + extraction_tasks: Union[List[Task], List[str]] = None, + enrichment_tasks: Union[List[Task], List[str]] = None, data: Optional[Any] = None, datasets: Union[str, list[str], list[UUID]] = None, user: User = None, @@ -68,6 +68,18 @@ async def memify( Use pipeline_run_id from return value to monitor progress. """ + # Use default coding rules tasks if no tasks were provided + if not extraction_tasks: + extraction_tasks = [Task(extract_subgraph_chunks)] + if not enrichment_tasks: + enrichment_tasks = [ + Task( + add_rule_associations, + rules_nodeset_name="coding_agent_rules", + task_config={"batch_size": 1}, + ) + ] + if not data: memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) # Subgraphs should be a single element in the list to represent one data item diff --git a/cognee/modules/retrieval/coding_rules_retriever.py b/cognee/modules/retrieval/coding_rules_retriever.py index 2578d1ee1..364ff3236 100644 --- a/cognee/modules/retrieval/coding_rules_retriever.py +++ b/cognee/modules/retrieval/coding_rules_retriever.py @@ -7,8 +7,11 @@ logger = get_logger("CodingRulesRetriever") class CodingRulesRetriever: """Retriever for handling codeing rule based searches.""" - def __init__(self, rules_nodeset_name): + def __init__(self, rules_nodeset_name="coding_agent_rules"): if isinstance(rules_nodeset_name, list): + if not rules_nodeset_name: + # If there is no provided nodeset set to coding_agent_rules + rules_nodeset_name = ["coding_agent_rules"] rules_nodeset_name = rules_nodeset_name[0] self.rules_nodeset_name = rules_nodeset_name """Initialize retriever with search parameters.""" From 9e201035493e6a38d614db9cbbd87b7d69a926d6 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 4 Sep 2025 20:59:00 +0200 Subject: [PATCH 13/14] feat: Enable multi-user mode to work with memify --- .../v1/memify/routers/get_memify_router.py | 12 +++---- .../modules/graph/cognee_graph/CogneeGraph.py | 2 +- cognee/modules/memify/memify.py | 32 ++++++++++--------- 3 files changed, 24 insertions(+), 22 deletions(-) diff --git a/cognee/api/v1/memify/routers/get_memify_router.py b/cognee/api/v1/memify/routers/get_memify_router.py index 817eef9bd..cf1df8f71 100644 --- a/cognee/api/v1/memify/routers/get_memify_router.py +++ b/cognee/api/v1/memify/routers/get_memify_router.py @@ -23,8 +23,8 @@ class MemifyPayloadDTO(InDTO): ) enrichment_tasks: Optional[List[str]] = Field(default=None, examples=[[]]) data: Optional[str] = Field(default="") - dataset_names: Optional[List[str]] = Field(default=None, examples=[[]]) - dataset_ids: Optional[List[UUID]] = Field(default=None, examples=[[]]) + dataset_name: Optional[str] = Field(default=None) + dataset_id: Optional[UUID] = Field(default=None, examples=[[""]]) node_name: Optional[List[str]] = Field(default=None, examples=[[]]) run_in_background: Optional[bool] = Field(default=False) @@ -46,8 +46,8 @@ def get_memify_router() -> APIRouter: - **data** Optional[List[str]]: The data to ingest. Can be any text data when custom extraction and enrichment tasks are used. Data provided here will be forwarded to the first extraction task in the pipeline as input. If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded - - **dataset_names** (Optional[List[str]]): Name of the datasets to memify - - **dataset_ids** (Optional[List[UUID]]): List of UUIDs of an already existing dataset + - **dataset_name** (Optional[str]): Name of the datasets to memify + - **dataset_id** (Optional[UUID]): List of UUIDs of an already existing dataset - **node_name** (Optional[List[str]]): Filter graph to specific named entities (for targeted search). Used when no data is provided. - **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking). @@ -75,7 +75,7 @@ def get_memify_router() -> APIRouter: additional_properties={"endpoint": "POST /v1/memify"}, ) - if not payload.dataset_ids and not payload.dataset_names: + if not payload.dataset_id and not payload.dataset_name: raise ValueError("Either datasetId or datasetName must be provided.") try: @@ -85,7 +85,7 @@ def get_memify_router() -> APIRouter: extraction_tasks=payload.extraction_tasks, enrichment_tasks=payload.enrichment_tasks, data=payload.data, - datasets=payload.dataset_ids if payload.dataset_ids else payload.dataset_names, + dataset=payload.dataset_id if payload.dataset_id else payload.dataset_name, node_name=payload.node_name, user=user, ) diff --git a/cognee/modules/graph/cognee_graph/CogneeGraph.py b/cognee/modules/graph/cognee_graph/CogneeGraph.py index 924532ce0..acfe04de7 100644 --- a/cognee/modules/graph/cognee_graph/CogneeGraph.py +++ b/cognee/modules/graph/cognee_graph/CogneeGraph.py @@ -76,7 +76,7 @@ class CogneeGraph(CogneeAbstractGraph): start_time = time.time() # Determine projection strategy - if node_type is not None and node_name not in [None, []]: + if node_type is not None and node_name not in [None, [], ""]: nodes_data, edges_data = await adapter.get_nodeset_subgraph( node_type=node_type, node_name=node_name ) diff --git a/cognee/modules/memify/memify.py b/cognee/modules/memify/memify.py index d8e1087f2..2d9b32a1b 100644 --- a/cognee/modules/memify/memify.py +++ b/cognee/modules/memify/memify.py @@ -4,7 +4,7 @@ from uuid import UUID from cognee.shared.logging_utils import get_logger from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment - +from cognee.context_global_variables import set_database_global_context_variables from cognee.modules.engine.models.node_set import NodeSet from cognee.modules.pipelines import run_pipeline from cognee.modules.pipelines.tasks.task import Task @@ -29,7 +29,7 @@ async def memify( extraction_tasks: Union[List[Task], List[str]] = None, enrichment_tasks: Union[List[Task], List[str]] = None, data: Optional[Any] = None, - datasets: Union[str, list[str], list[UUID]] = None, + dataset: Union[str, UUID] = "main_dataset", user: User = None, node_type: Optional[Type] = NodeSet, node_name: Optional[List[str]] = None, @@ -53,10 +53,7 @@ async def memify( data: The data to ingest. Can be anything when custom extraction and enrichment tasks are used. Data provided here will be forwarded to the first extraction task in the pipeline as input. If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded - datasets: Dataset name(s) or dataset uuid to process. Processes all available datasets if None. - - Single dataset: "my_dataset" - - Multiple datasets: ["docs", "research", "reports"] - - None: Process all datasets for the user + dataset: Dataset name or dataset uuid to process. user: User context for authentication and data access. Uses default if None. node_type: Filter graph to specific entity types (for advanced filtering). Used when no data is provided. node_name: Filter graph to specific named entities (for targeted search). Used when no data is provided. @@ -80,7 +77,17 @@ async def memify( ) ] + await setup() + + user, authorized_dataset_list = await resolve_authorized_user_datasets(dataset, user) + authorized_dataset = authorized_dataset_list[0] + if not data: + # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True + await set_database_global_context_variables( + authorized_dataset.id, authorized_dataset.owner_id + ) + memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) # Subgraphs should be a single element in the list to represent one data item data = [memory_fragment] @@ -90,14 +97,9 @@ async def memify( *enrichment_tasks, ] - await setup() - - user, authorized_datasets = await resolve_authorized_user_datasets(datasets, user) - - for dataset in authorized_datasets: - await reset_dataset_pipeline_run_status( - dataset.id, user, pipeline_names=["memify_pipeline"] - ) + await reset_dataset_pipeline_run_status( + authorized_dataset.id, user, pipeline_names=["memify_pipeline"] + ) # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) @@ -108,7 +110,7 @@ async def memify( tasks=memify_tasks, user=user, data=data, - datasets=datasets, + datasets=authorized_dataset.id, vector_db_config=vector_db_config, graph_db_config=graph_db_config, incremental_loading=False, From 0c7ba7c23610cf966c5660b9ad8d6f5f054dc573 Mon Sep 17 00:00:00 2001 From: Igor Ilic Date: Thu, 4 Sep 2025 21:05:24 +0200 Subject: [PATCH 14/14] refactor: Allow none through swagger --- cognee/api/v1/add/routers/get_add_router.py | 3 ++- cognee/api/v1/memify/routers/get_memify_router.py | 5 +++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index f27d559e1..dfa7d275b 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -21,7 +21,8 @@ def get_add_router() -> APIRouter: async def add( data: List[UploadFile] = File(default=None), datasetName: Optional[str] = Form(default=None), - datasetId: Union[UUID, None] = Form(default=None, examples=[""]), + # Note: Literal is needed for Swagger use + datasetId: Union[UUID, Literal[""], None] = Form(default=None, examples=[""]), node_set: Optional[List[str]] = Form(default=[""], example=[""]), user: User = Depends(get_authenticated_user), ): diff --git a/cognee/api/v1/memify/routers/get_memify_router.py b/cognee/api/v1/memify/routers/get_memify_router.py index cf1df8f71..1976d7414 100644 --- a/cognee/api/v1/memify/routers/get_memify_router.py +++ b/cognee/api/v1/memify/routers/get_memify_router.py @@ -4,7 +4,7 @@ from fastapi import APIRouter from fastapi.responses import JSONResponse from fastapi import Depends from pydantic import Field -from typing import List, Optional +from typing import List, Optional, Union, Literal from cognee.api.DTO import InDTO from cognee.modules.users.models import User @@ -24,7 +24,8 @@ class MemifyPayloadDTO(InDTO): enrichment_tasks: Optional[List[str]] = Field(default=None, examples=[[]]) data: Optional[str] = Field(default="") dataset_name: Optional[str] = Field(default=None) - dataset_id: Optional[UUID] = Field(default=None, examples=[[""]]) + # Note: Literal is needed for Swagger use + dataset_id: Union[UUID, Literal[""], None] = Field(default=None, examples=[""]) node_name: Optional[List[str]] = Field(default=None, examples=[[]]) run_in_background: Optional[bool] = Field(default=False)