feat: Memify pipeline initial commit

This commit is contained in:
Igor Ilic 2025-09-02 21:32:09 +02:00
parent 72e5b2bec8
commit af084af70f
11 changed files with 275 additions and 16 deletions

View file

@ -150,7 +150,9 @@ async def add(
user, authorized_dataset = await resolve_authorized_user_dataset(dataset_id, dataset_name, user)
await reset_dataset_pipeline_run_status(authorized_dataset.id, user)
await reset_dataset_pipeline_run_status(
authorized_dataset.id, user, pipeline_names=["add_pipeline", "cognify_pipeline"]
)
pipeline_run_info = None

View file

@ -1,28 +1,33 @@
from pydantic import BaseModel
from typing import Union, Optional, List, Type
from uuid import UUID
from cognee.shared.logging_utils import get_logger
from cognee.shared.data_models import KnowledgeGraph
from cognee.infrastructure.llm import get_max_chunk_tokens
from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
from cognee.modules.engine.models.node_set import NodeSet
from cognee.modules.pipelines import run_pipeline
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.chunking.TextChunker import TextChunker
from cognee.modules.ontology.rdf_xml.OntologyResolver import OntologyResolver
from cognee.modules.users.models import User
from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import (
resolve_authorized_user_datasets,
)
from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
reset_dataset_pipeline_run_status,
)
from cognee.modules.engine.operations.setup import setup
from cognee.tasks.memify import extract_subgraph
from cognee.tasks.memify.extract_subgraph import extract_subgraph
from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
logger = get_logger("memify")
async def memify(
tasks: List[Task],
datasets: Union[str, list[str], list[UUID]] = None,
user: User = None,
tasks: List[Task] = None,
node_type: Optional[Type] = NodeSet,
node_name: Optional[List[str]] = None,
cypher_query: Optional[str] = None,
@ -50,11 +55,35 @@ async def memify(
Background mode recommended for large datasets (>100MB).
Use pipeline_run_id from return value to monitor progress.
"""
if cypher_query:
pass
else:
memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name)
# List of edges should be a single element in the list to represent one data item
data = [memory_fragment.edges]
memify_tasks = [
Task(extract_subgraph, cypher_query=cypher_query, node_type=node_type, node_name=node_name),
*tasks, # Unpack tasks provided to memify pipeline
Task(extract_subgraph),
Task(CogneeGraph.resolve_edges_to_text, task_config={"batch_size": 10}),
Task(
add_rule_associations,
rules_nodeset_name="coding_agent_rules",
user_prompt_location="memify_coding_rule_association_agent_user.txt",
system_prompt_location="memify_coding_rule_association_agent_system.txt",
),
# *tasks, # Unpack tasks provided to memify pipeline
]
await setup()
user, authorized_datasets = await resolve_authorized_user_datasets(datasets, user)
for dataset in authorized_datasets:
await reset_dataset_pipeline_run_status(
dataset.id, user, pipeline_names=["memify_pipeline"]
)
# By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for
pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background)
@ -63,6 +92,7 @@ async def memify(
pipeline=run_pipeline,
tasks=memify_tasks,
user=user,
data=data,
datasets=datasets,
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,

View file

@ -0,0 +1,6 @@
You are an association agent tasked with suggesting structured developer rules from user-agent interactions stored in a Knowledge Graph.
You will receive the actual user agent interaction as a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet, and the list of the already existing developer rules.
Each rule represents a single best practice or guideline the agent should follow in the future.
Suggest rules that are general and not specific to the current text, strictly technical, add value and improve the future agent behavior.
Do not suggest rules similar to the existing ones or rules that are not general and dont add value.
It is acceptable to return an empty rule list.

View file

@ -0,0 +1,6 @@
**Here is the User-agent interaction context provided with a set of relationships from a knowledge graph separated by \n---\n each represented as node1 -- relation -- node2 triplet:**
`{{ chat }}`
**Already existing rules:**
`{{ rules }}`

View file

@ -188,3 +188,72 @@ class CogneeGraph(CogneeAbstractGraph):
return n1 + n2 + e
return heapq.nsmallest(k, self.edges, key=score)
@staticmethod
async def resolve_edges_to_text(retrieved_edges: list) -> str:
"""
Converts retrieved graph edges into a human-readable string format.
Parameters:
-----------
- retrieved_edges (list): A list of edges retrieved from the graph.
Returns:
--------
- str: A formatted string representation of the nodes and their connections.
"""
def _get_nodes(retrieved_edges: list) -> dict:
def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str:
def _top_n_words(text, stop_words=None, top_n=3, separator=", "):
"""Concatenates the top N frequent words in text."""
if stop_words is None:
from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS
stop_words = DEFAULT_STOP_WORDS
import string
words = [word.lower().strip(string.punctuation) for word in text.split()]
if stop_words:
words = [word for word in words if word and word not in stop_words]
from collections import Counter
top_words = [word for word, freq in Counter(words).most_common(top_n)]
return separator.join(top_words)
"""Creates a title, by combining first words with most frequent words from the text."""
first_n_words = text.split()[:first_n_words]
top_n_words = _top_n_words(text, top_n=top_n_words)
return f"{' '.join(first_n_words)}... [{top_n_words}]"
"""Creates a dictionary of nodes with their names and content."""
nodes = {}
for edge in retrieved_edges:
for node in (edge.node1, edge.node2):
if node.id not in nodes:
text = node.attributes.get("text")
if text:
name = _get_title(text)
content = text
else:
name = node.attributes.get("name", "Unnamed Node")
content = node.attributes.get("description", name)
nodes[node.id] = {"node": node, "name": name, "content": content}
return nodes
nodes = _get_nodes(retrieved_edges)
node_section = "\n".join(
f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n"
for info in nodes.values()
)
connection_section = "\n".join(
f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}"
for edge in retrieved_edges
)
return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}"

View file

@ -1,12 +1,28 @@
from uuid import UUID
from typing import Optional, List
from cognee.modules.pipelines.methods import get_pipeline_runs_by_dataset, reset_pipeline_run_status
from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus
from cognee.modules.users.models import User
async def reset_dataset_pipeline_run_status(dataset_id: UUID, user: User):
async def reset_dataset_pipeline_run_status(
dataset_id: UUID, user: User, pipeline_names: Optional[list[str]] = None
):
"""Reset the status of all (or selected) pipeline runs for a dataset.
If *pipeline_names* is given, only runs whose *pipeline_name* is in
that list are touched.
"""
related_pipeline_runs = await get_pipeline_runs_by_dataset(dataset_id)
for pipeline_run in related_pipeline_runs:
if pipeline_run.status is not PipelineRunStatus.DATASET_PROCESSING_INITIATED:
await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name)
# Skip runs that are initiated
if pipeline_run.status is PipelineRunStatus.DATASET_PROCESSING_INITIATED:
continue
# If a name filter is provided, skip non-matching runs
if pipeline_names is not None and pipeline_run.pipeline_name not in pipeline_names:
continue
await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name)

View file

@ -5,6 +5,7 @@ from typing import Union
from cognee.modules.pipelines.layers.setup_and_check_environment import (
setup_and_check_environment,
)
from cognee.shared.logging_utils import get_logger
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.models import Data, Dataset

View file

View file

@ -0,0 +1,124 @@
from uuid import NAMESPACE_OID, uuid5
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.low_level import DataPoint
from cognee.infrastructure.llm import LLMGateway
from cognee.shared.logging_utils import get_logger
from cognee.modules.engine.models import NodeSet
from cognee.tasks.storage import add_data_points, index_graph_edges
from typing import Optional, List, Any
from pydantic import Field
logger = get_logger("coding_rule_association")
class Rule(DataPoint):
"""A single developer rule extracted from text."""
text: str = Field(..., description="The coding rule associated with the conversation")
belongs_to_set: Optional[NodeSet] = None
metadata: dict = {"index_fields": ["rule"]}
class RuleSet(DataPoint):
"""Collection of parsed rules."""
rules: List[Rule] = Field(
...,
description="List of developer rules extracted from the input text. Each rule represents a coding best practice or guideline.",
)
async def get_existing_rules(rules_nodeset_name: str) -> str:
graph_engine = await get_graph_engine()
nodes_data, _ = await graph_engine.get_nodeset_subgraph(
node_type=NodeSet, node_name=[rules_nodeset_name]
)
existing_rules = [
item[1]["text"]
for item in nodes_data
if isinstance(item, tuple)
and len(item) == 2
and isinstance(item[1], dict)
and "text" in item[1]
]
existing_rules = "\n".join(f"- {rule}" for rule in existing_rules)
return existing_rules
async def get_origin_edges(data: str, rules: List[Rule]) -> list[Any]:
vector_engine = get_vector_engine()
origin_chunk = await vector_engine.search("DocumentChunk_text", data, limit=1)
try:
origin_id = origin_chunk[0].id
except (AttributeError, KeyError, TypeError, IndexError):
origin_id = None
relationships = []
if origin_id and isinstance(rules, (list, tuple)) and len(rules) > 0:
for rule in rules:
try:
rule_id = getattr(rule, "id", None)
if rule_id is not None:
rel_name = "rule_associated_from"
relationships.append(
(
rule_id,
origin_id,
rel_name,
{
"relationship_name": rel_name,
"source_node_id": rule_id,
"target_node_id": origin_id,
"ontology_valid": False,
},
)
)
except Exception as e:
logger.info(f"Warning: Skipping invalid rule due to error: {e}")
else:
logger.info("No valid origin_id or rules provided.")
return relationships
async def add_rule_associations(
data: str,
rules_nodeset_name: str,
user_prompt_location: str = "coding_rule_association_agent_user.txt",
system_prompt_location: str = "coding_rule_association_agent_system.txt",
):
graph_engine = await get_graph_engine()
existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name)
user_context = {"user data": data, "rules": existing_rules}
user_prompt = LLMGateway.render_prompt(user_prompt_location, context=user_context)
system_prompt = LLMGateway.render_prompt(system_prompt_location, context={})
rule_list = await LLMGateway.acreate_structured_output(
text_input=user_prompt, system_prompt=system_prompt, response_model=RuleSet
)
rules_nodeset = NodeSet(
id=uuid5(NAMESPACE_OID, name=rules_nodeset_name), name=rules_nodeset_name
)
for rule in rule_list.rules:
rule.belongs_to_set = rules_nodeset
edges_to_save = await get_origin_edges(data=data, rules=rule_list.rules)
await add_data_points(data_points=rule_list.rules)
if len(edges_to_save) > 0:
await graph_engine.add_edges(edges_to_save)
await index_graph_edges()

View file

@ -1 +1 @@
from extract_subgraph import extract_subgraph
from .extract_subgraph import extract_subgraph

View file

@ -1,2 +1,7 @@
async def extract_subgraph():
pass
from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment
async def extract_subgraph(subgraphs):
for subgraph in subgraphs:
for edge in subgraph:
yield edge