Memify pipeline (#1329)

<!-- .github/pull_request_template.md -->

## Description
Memify pipeline and examples

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Vasilije 2025-09-04 21:23:41 +02:00 committed by GitHub
commit db3746d769
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
25 changed files with 606 additions and 58 deletions

View file

@ -18,6 +18,7 @@ logger = setup_logging()
from .api.v1.add import add
from .api.v1.delete import delete
from .api.v1.cognify import cognify
from .modules.memify import memify
from .api.v1.config.config import config
from .api.v1.datasets.datasets import datasets
from .api.v1.prune import prune

View file

@ -22,6 +22,7 @@ from cognee.api.v1.settings.routers import get_settings_router
from cognee.api.v1.datasets.routers import get_datasets_router
from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_router
from cognee.api.v1.search.routers import get_search_router
from cognee.api.v1.memify.routers import get_memify_router
from cognee.api.v1.add.routers import get_add_router
from cognee.api.v1.delete.routers import get_delete_router
from cognee.api.v1.responses.routers import get_responses_router
@ -235,6 +236,8 @@ app.include_router(get_add_router(), prefix="/api/v1/add", tags=["add"])
app.include_router(get_cognify_router(), prefix="/api/v1/cognify", tags=["cognify"])
app.include_router(get_memify_router(), prefix="/api/v1/memify", tags=["memify"])
app.include_router(get_search_router(), prefix="/api/v1/search", tags=["search"])
app.include_router(

View file

@ -150,7 +150,9 @@ async def add(
user, authorized_dataset = await resolve_authorized_user_dataset(dataset_id, dataset_name, user)
await reset_dataset_pipeline_run_status(authorized_dataset.id, user)
await reset_dataset_pipeline_run_status(
authorized_dataset.id, user, pipeline_names=["add_pipeline", "cognify_pipeline"]
)
pipeline_run_info = None

View file

@ -1,6 +1,3 @@
import os
import requests
import subprocess
from uuid import UUID
from fastapi import APIRouter
@ -24,6 +21,7 @@ def get_add_router() -> APIRouter:
async def add(
data: List[UploadFile] = File(default=None),
datasetName: Optional[str] = Form(default=None),
# Note: Literal is needed for Swagger use
datasetId: Union[UUID, Literal[""], None] = Form(default=None, examples=[""]),
node_set: Optional[List[str]] = Form(default=[""], example=[""]),
user: User = Depends(get_authenticated_user),
@ -60,9 +58,6 @@ def get_add_router() -> APIRouter:
## Notes
- To add data to datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True)
- GitHub repositories are cloned and all files are processed
- HTTP URLs are fetched and their content is processed
- The ALLOW_HTTP_REQUESTS environment variable controls URL processing
- datasetId value can only be the UUID of an already existing dataset
"""
send_telemetry(

View file

View file

@ -0,0 +1 @@
from .get_memify_router import get_memify_router

View file

@ -0,0 +1,100 @@
from uuid import UUID
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from fastapi import Depends
from pydantic import Field
from typing import List, Optional, Union, Literal
from cognee.api.DTO import InDTO
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.shared.utils import send_telemetry
from cognee.modules.pipelines.models import PipelineRunErrored
from cognee.shared.logging_utils import get_logger
logger = get_logger()
class MemifyPayloadDTO(InDTO):
extraction_tasks: Optional[List[str]] = Field(
default=None,
examples=[[]],
)
enrichment_tasks: Optional[List[str]] = Field(default=None, examples=[[]])
data: Optional[str] = Field(default="")
dataset_name: Optional[str] = Field(default=None)
# Note: Literal is needed for Swagger use
dataset_id: Union[UUID, Literal[""], None] = Field(default=None, examples=[""])
node_name: Optional[List[str]] = Field(default=None, examples=[[]])
run_in_background: Optional[bool] = Field(default=False)
def get_memify_router() -> APIRouter:
router = APIRouter()
@router.post("", response_model=dict)
async def memify(payload: MemifyPayloadDTO, user: User = Depends(get_authenticated_user)):
"""
Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data,
custom data can also be provided instead which can be processed with provided extraction and enrichment tasks.
Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation.
## Request Parameters
- **extractionTasks** Optional[List[str]]: List of available Cognee Tasks to execute for graph/data extraction.
- **enrichmentTasks** Optional[List[str]]: List of available Cognee Tasks to handle enrichment of provided graph/data from extraction tasks.
- **data** Optional[List[str]]: The data to ingest. Can be any text data when custom extraction and enrichment tasks are used.
Data provided here will be forwarded to the first extraction task in the pipeline as input.
If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded
- **dataset_name** (Optional[str]): Name of the datasets to memify
- **dataset_id** (Optional[UUID]): List of UUIDs of an already existing dataset
- **node_name** (Optional[List[str]]): Filter graph to specific named entities (for targeted search). Used when no data is provided.
- **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking).
Either datasetName or datasetId must be provided.
## Response
Returns information about the add operation containing:
- Status of the operation
- Details about the processed data
- Any relevant metadata from the ingestion process
## Error Codes
- **400 Bad Request**: Neither datasetId nor datasetName provided
- **409 Conflict**: Error during memify operation
- **403 Forbidden**: User doesn't have permission to use dataset
## Notes
- To memify datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True)
- datasetId value can only be the UUID of an already existing dataset
"""
send_telemetry(
"Memify API Endpoint Invoked",
user.id,
additional_properties={"endpoint": "POST /v1/memify"},
)
if not payload.dataset_id and not payload.dataset_name:
raise ValueError("Either datasetId or datasetName must be provided.")
try:
from cognee.modules.memify import memify as cognee_memify
memify_run = await cognee_memify(
extraction_tasks=payload.extraction_tasks,
enrichment_tasks=payload.enrichment_tasks,
data=payload.data,
dataset=payload.dataset_id if payload.dataset_id else payload.dataset_name,
node_name=payload.node_name,
user=user,
)
if isinstance(memify_run, PipelineRunErrored):
return JSONResponse(status_code=420, content=memify_run)
return memify_run
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})
return router

View file

@ -2,7 +2,7 @@ from typing import List, Union
from uuid import UUID
from cognee.modules.data.models import Dataset
from cognee.modules.data.methods import create_authorized_dataset
from cognee.modules.data.methods.create_authorized_dataset import create_authorized_dataset
from cognee.modules.data.exceptions import DatasetNotFoundError

View file

@ -76,7 +76,7 @@ class CogneeGraph(CogneeAbstractGraph):
start_time = time.time()
# Determine projection strategy
if node_type is not None and node_name not in [None, []]:
if node_type is not None and node_name not in [None, [], ""]:
nodes_data, edges_data = await adapter.get_nodeset_subgraph(
node_type=node_type, node_name=node_name
)

View file

@ -4,3 +4,4 @@ from .get_model_instance_from_graph import get_model_instance_from_graph
from .retrieve_existing_edges import retrieve_existing_edges
from .convert_node_to_data_point import convert_node_to_data_point
from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges
from .resolve_edges_to_text import resolve_edges_to_text

View file

@ -0,0 +1,67 @@
async def resolve_edges_to_text(retrieved_edges: list) -> str:
"""
Converts retrieved graph edges into a human-readable string format.
Parameters:
-----------
- retrieved_edges (list): A list of edges retrieved from the graph.
Returns:
--------
- str: A formatted string representation of the nodes and their connections.
"""
def _get_nodes(retrieved_edges: list) -> dict:
def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str:
def _top_n_words(text, stop_words=None, top_n=3, separator=", "):
"""Concatenates the top N frequent words in text."""
if stop_words is None:
from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS
stop_words = DEFAULT_STOP_WORDS
import string
words = [word.lower().strip(string.punctuation) for word in text.split()]
if stop_words:
words = [word for word in words if word and word not in stop_words]
from collections import Counter
top_words = [word for word, freq in Counter(words).most_common(top_n)]
return separator.join(top_words)
"""Creates a title, by combining first words with most frequent words from the text."""
first_n_words = text.split()[:first_n_words]
top_n_words = _top_n_words(text, top_n=top_n_words)
return f"{' '.join(first_n_words)}... [{top_n_words}]"
"""Creates a dictionary of nodes with their names and content."""
nodes = {}
for edge in retrieved_edges:
for node in (edge.node1, edge.node2):
if node.id not in nodes:
text = node.attributes.get("text")
if text:
name = _get_title(text)
content = text
else:
name = node.attributes.get("name", "Unnamed Node")
content = node.attributes.get("description", name)
nodes[node.id] = {"node": node, "name": name, "content": content}
return nodes
nodes = _get_nodes(retrieved_edges)
node_section = "\n".join(
f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n"
for info in nodes.values()
)
connection_section = "\n".join(
f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}"
for edge in retrieved_edges
)
return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}"

View file

@ -0,0 +1 @@
from .memify import memify

View file

@ -0,0 +1,118 @@
from typing import Union, Optional, List, Type, Any
from uuid import UUID
from cognee.shared.logging_utils import get_logger
from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment
from cognee.context_global_variables import set_database_global_context_variables
from cognee.modules.engine.models.node_set import NodeSet
from cognee.modules.pipelines import run_pipeline
from cognee.modules.pipelines.tasks.task import Task
from cognee.modules.users.models import User
from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import (
resolve_authorized_user_datasets,
)
from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import (
reset_dataset_pipeline_run_status,
)
from cognee.modules.engine.operations.setup import setup
from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor
from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks
from cognee.tasks.codingagents.coding_rule_associations import (
add_rule_associations,
)
logger = get_logger("memify")
async def memify(
extraction_tasks: Union[List[Task], List[str]] = None,
enrichment_tasks: Union[List[Task], List[str]] = None,
data: Optional[Any] = None,
dataset: Union[str, UUID] = "main_dataset",
user: User = None,
node_type: Optional[Type] = NodeSet,
node_name: Optional[List[str]] = None,
vector_db_config: Optional[dict] = None,
graph_db_config: Optional[dict] = None,
run_in_background: bool = False,
):
"""
Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data,
custom data can also be provided instead which can be processed with provided extraction and enrichment tasks.
Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation.
This is the core processing step in Cognee that converts raw text and documents
into an intelligent knowledge graph. It analyzes content, extracts entities and
relationships, and creates semantic connections for enhanced search and reasoning.
Args:
extraction_tasks: List of Cognee Tasks to execute for graph/data extraction.
enrichment_tasks: List of Cognee Tasks to handle enrichment of provided graph/data from extraction tasks.
data: The data to ingest. Can be anything when custom extraction and enrichment tasks are used.
Data provided here will be forwarded to the first extraction task in the pipeline as input.
If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded
dataset: Dataset name or dataset uuid to process.
user: User context for authentication and data access. Uses default if None.
node_type: Filter graph to specific entity types (for advanced filtering). Used when no data is provided.
node_name: Filter graph to specific named entities (for targeted search). Used when no data is provided.
vector_db_config: Custom vector database configuration for embeddings storage.
graph_db_config: Custom graph database configuration for relationship storage.
run_in_background: If True, starts processing asynchronously and returns immediately.
If False, waits for completion before returning.
Background mode recommended for large datasets (>100MB).
Use pipeline_run_id from return value to monitor progress.
"""
# Use default coding rules tasks if no tasks were provided
if not extraction_tasks:
extraction_tasks = [Task(extract_subgraph_chunks)]
if not enrichment_tasks:
enrichment_tasks = [
Task(
add_rule_associations,
rules_nodeset_name="coding_agent_rules",
task_config={"batch_size": 1},
)
]
await setup()
user, authorized_dataset_list = await resolve_authorized_user_datasets(dataset, user)
authorized_dataset = authorized_dataset_list[0]
if not data:
# Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True
await set_database_global_context_variables(
authorized_dataset.id, authorized_dataset.owner_id
)
memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name)
# Subgraphs should be a single element in the list to represent one data item
data = [memory_fragment]
memify_tasks = [
*extraction_tasks, # Unpack tasks provided to memify pipeline
*enrichment_tasks,
]
await reset_dataset_pipeline_run_status(
authorized_dataset.id, user, pipeline_names=["memify_pipeline"]
)
# By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for
pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background)
# Run the run_pipeline in the background or blocking based on executor
return await pipeline_executor_func(
pipeline=run_pipeline,
tasks=memify_tasks,
user=user,
data=data,
datasets=authorized_dataset.id,
vector_db_config=vector_db_config,
graph_db_config=graph_db_config,
incremental_loading=False,
pipeline_name="memify_pipeline",
)

View file

@ -1,12 +1,28 @@
from uuid import UUID
from typing import Optional, List
from cognee.modules.pipelines.methods import get_pipeline_runs_by_dataset, reset_pipeline_run_status
from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus
from cognee.modules.users.models import User
async def reset_dataset_pipeline_run_status(dataset_id: UUID, user: User):
async def reset_dataset_pipeline_run_status(
dataset_id: UUID, user: User, pipeline_names: Optional[list[str]] = None
):
"""Reset the status of all (or selected) pipeline runs for a dataset.
If *pipeline_names* is given, only runs whose *pipeline_name* is in
that list are touched.
"""
related_pipeline_runs = await get_pipeline_runs_by_dataset(dataset_id)
for pipeline_run in related_pipeline_runs:
if pipeline_run.status is not PipelineRunStatus.DATASET_PROCESSING_INITIATED:
await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name)
# Skip runs that are initiated
if pipeline_run.status is PipelineRunStatus.DATASET_PROCESSING_INITIATED:
continue
# If a name filter is provided, skip non-matching runs
if pipeline_names is not None and pipeline_run.pipeline_name not in pipeline_names:
continue
await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name)

View file

@ -5,6 +5,7 @@ from typing import Union
from cognee.modules.pipelines.layers.setup_and_check_environment import (
setup_and_check_environment,
)
from cognee.shared.logging_utils import get_logger
from cognee.modules.data.methods.get_dataset_data import get_dataset_data
from cognee.modules.data.models import Data, Dataset

View file

@ -0,0 +1,22 @@
from cognee.shared.logging_utils import get_logger
from cognee.tasks.codingagents.coding_rule_associations import get_existing_rules
logger = get_logger("CodingRulesRetriever")
class CodingRulesRetriever:
"""Retriever for handling codeing rule based searches."""
def __init__(self, rules_nodeset_name="coding_agent_rules"):
if isinstance(rules_nodeset_name, list):
if not rules_nodeset_name:
# If there is no provided nodeset set to coding_agent_rules
rules_nodeset_name = ["coding_agent_rules"]
rules_nodeset_name = rules_nodeset_name[0]
self.rules_nodeset_name = rules_nodeset_name
"""Initialize retriever with search parameters."""
async def get_existing_rules(self, query_text):
return await get_existing_rules(
rules_nodeset_name=self.rules_nodeset_name, return_list=True
)

View file

@ -5,6 +5,7 @@ import string
from cognee.infrastructure.engine import DataPoint
from cognee.tasks.storage import add_data_points
from cognee.modules.graph.utils import resolve_edges_to_text
from cognee.modules.graph.utils.convert_node_to_data_point import get_all_subclasses
from cognee.modules.retrieval.base_retriever import BaseRetriever
from cognee.modules.retrieval.utils.brute_force_triplet_search import brute_force_triplet_search
@ -53,22 +54,6 @@ class GraphCompletionRetriever(BaseRetriever):
self.node_type = node_type
self.node_name = node_name
def _get_nodes(self, retrieved_edges: list) -> dict:
"""Creates a dictionary of nodes with their names and content."""
nodes = {}
for edge in retrieved_edges:
for node in (edge.node1, edge.node2):
if node.id not in nodes:
text = node.attributes.get("text")
if text:
name = self._get_title(text)
content = text
else:
name = node.attributes.get("name", "Unnamed Node")
content = node.attributes.get("description", name)
nodes[node.id] = {"node": node, "name": name, "content": content}
return nodes
async def resolve_edges_to_text(self, retrieved_edges: list) -> str:
"""
Converts retrieved graph edges into a human-readable string format.
@ -83,16 +68,7 @@ class GraphCompletionRetriever(BaseRetriever):
- str: A formatted string representation of the nodes and their connections.
"""
nodes = self._get_nodes(retrieved_edges)
node_section = "\n".join(
f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n"
for info in nodes.values()
)
connection_section = "\n".join(
f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}"
for edge in retrieved_edges
)
return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}"
return await resolve_edges_to_text(retrieved_edges)
async def get_triplets(self, query: str) -> list:
"""
@ -196,26 +172,6 @@ class GraphCompletionRetriever(BaseRetriever):
return [completion]
def _top_n_words(self, text, stop_words=None, top_n=3, separator=", "):
"""Concatenates the top N frequent words in text."""
if stop_words is None:
stop_words = DEFAULT_STOP_WORDS
words = [word.lower().strip(string.punctuation) for word in text.split()]
if stop_words:
words = [word for word in words if word and word not in stop_words]
top_words = [word for word, freq in Counter(words).most_common(top_n)]
return separator.join(top_words)
def _get_title(self, text: str, first_n_words: int = 7, top_n_words: int = 3) -> str:
"""Creates a title, by combining first words with most frequent words from the text."""
first_n_words = text.split()[:first_n_words]
top_n_words = self._top_n_words(text, top_n=top_n_words)
return f"{' '.join(first_n_words)}... [{top_n_words}]"
async def save_qa(self, question: str, answer: str, context: str, triplets: List) -> None:
"""
Saves a question and answer pair for later analysis or storage.

View file

@ -14,6 +14,7 @@ from cognee.modules.retrieval.summaries_retriever import SummariesRetriever
from cognee.modules.retrieval.completion_retriever import CompletionRetriever
from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever
from cognee.modules.retrieval.temporal_retriever import TemporalRetriever
from cognee.modules.retrieval.coding_rules_retriever import CodingRulesRetriever
from cognee.modules.retrieval.graph_summary_completion_retriever import (
GraphSummaryCompletionRetriever,
)
@ -169,6 +170,9 @@ async def specific_search(
SearchType.NATURAL_LANGUAGE: NaturalLanguageRetriever().get_completion,
SearchType.FEEDBACK: UserQAFeedback(last_k=last_k).add_feedback,
SearchType.TEMPORAL: TemporalRetriever(top_k=top_k).get_completion,
SearchType.CODING_RULES: CodingRulesRetriever(
rules_nodeset_name=node_name
).get_existing_rules,
}
# If the query type is FEELING_LUCKY, select the search type intelligently

View file

@ -16,3 +16,4 @@ class SearchType(Enum):
FEELING_LUCKY = "FEELING_LUCKY"
FEEDBACK = "FEEDBACK"
TEMPORAL = "TEMPORAL"
CODING_RULES = "CODING_RULES"

View file

View file

@ -0,0 +1,129 @@
from uuid import NAMESPACE_OID, uuid5
from cognee.infrastructure.databases.graph import get_graph_engine
from cognee.infrastructure.databases.vector import get_vector_engine
from cognee.low_level import DataPoint
from cognee.infrastructure.llm import LLMGateway
from cognee.shared.logging_utils import get_logger
from cognee.modules.engine.models import NodeSet
from cognee.tasks.storage import add_data_points, index_graph_edges
from typing import Optional, List, Any
from pydantic import Field
logger = get_logger("coding_rule_association")
class Rule(DataPoint):
"""A single developer rule extracted from text."""
text: str = Field(..., description="The coding rule associated with the conversation")
belongs_to_set: Optional[NodeSet] = None
metadata: dict = {"index_fields": ["rule"]}
class RuleSet(DataPoint):
"""Collection of parsed rules."""
rules: List[Rule] = Field(
...,
description="List of developer rules extracted from the input text. Each rule represents a coding best practice or guideline.",
)
async def get_existing_rules(rules_nodeset_name: str, return_list: bool = False) -> str:
graph_engine = await get_graph_engine()
nodes_data, _ = await graph_engine.get_nodeset_subgraph(
node_type=NodeSet, node_name=[rules_nodeset_name]
)
existing_rules = [
item[1]["text"]
for item in nodes_data
if isinstance(item, tuple)
and len(item) == 2
and isinstance(item[1], dict)
and "text" in item[1]
]
if not return_list:
existing_rules = "\n".join(f"- {rule}" for rule in existing_rules)
return existing_rules
async def get_origin_edges(data: str, rules: List[Rule]) -> list[Any]:
vector_engine = get_vector_engine()
origin_chunk = await vector_engine.search("DocumentChunk_text", data, limit=1)
try:
origin_id = origin_chunk[0].id
except (AttributeError, KeyError, TypeError, IndexError):
origin_id = None
relationships = []
if origin_id and isinstance(rules, (list, tuple)) and len(rules) > 0:
for rule in rules:
try:
rule_id = getattr(rule, "id", None)
if rule_id is not None:
rel_name = "rule_associated_from"
relationships.append(
(
rule_id,
origin_id,
rel_name,
{
"relationship_name": rel_name,
"source_node_id": rule_id,
"target_node_id": origin_id,
"ontology_valid": False,
},
)
)
except Exception as e:
logger.info(f"Warning: Skipping invalid rule due to error: {e}")
else:
logger.info("No valid origin_id or rules provided.")
return relationships
async def add_rule_associations(
data: str,
rules_nodeset_name: str,
user_prompt_location: str = "coding_rule_association_agent_user.txt",
system_prompt_location: str = "coding_rule_association_agent_system.txt",
):
if isinstance(data, list):
# If data is a list of strings join all strings in list
data = " ".join(data)
graph_engine = await get_graph_engine()
existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name)
user_context = {"chat": data, "rules": existing_rules}
user_prompt = LLMGateway.render_prompt(user_prompt_location, context=user_context)
system_prompt = LLMGateway.render_prompt(system_prompt_location, context={})
rule_list = await LLMGateway.acreate_structured_output(
text_input=user_prompt, system_prompt=system_prompt, response_model=RuleSet
)
rules_nodeset = NodeSet(
id=uuid5(NAMESPACE_OID, name=rules_nodeset_name), name=rules_nodeset_name
)
for rule in rule_list.rules:
rule.belongs_to_set = rules_nodeset
edges_to_save = await get_origin_edges(data=data, rules=rule_list.rules)
await add_data_points(data_points=rule_list.rules)
if len(edges_to_save) > 0:
await graph_engine.add_edges(edges_to_save)
await index_graph_edges()

View file

@ -0,0 +1,2 @@
from .extract_subgraph import extract_subgraph
from .extract_subgraph_chunks import extract_subgraph_chunks

View file

@ -0,0 +1,7 @@
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
async def extract_subgraph(subgraphs: list[CogneeGraph]):
for subgraph in subgraphs:
for edge in subgraph.edges:
yield edge

View file

@ -0,0 +1,11 @@
from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph
async def extract_subgraph_chunks(subgraphs: list[CogneeGraph]):
"""
Get all Document Chunks from subgraphs and forward to next task in pipeline
"""
for subgraph in subgraphs:
for node in subgraph.nodes.values():
if node.attributes["type"] == "DocumentChunk":
yield node.attributes["text"]

View file

@ -0,0 +1,110 @@
import asyncio
import pathlib
import os
import cognee
from cognee import memify
from cognee.api.v1.visualize.visualize import visualize_graph
from cognee.shared.logging_utils import setup_logging, ERROR
from cognee.modules.pipelines.tasks.task import Task
from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks
from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations
# Prerequisites:
# 1. Copy `.env.template` and rename it to `.env`.
# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field:
# LLM_API_KEY = "your_key_here"
async def main():
# Create a clean slate for cognee -- reset data and system state
print("Resetting cognee data...")
await cognee.prune.prune_data()
await cognee.prune.prune_system(metadata=True)
print("Data reset complete.\n")
print("Adding conversation about rules to cognee:\n")
coding_rules_chat_from_principal_engineer = """
We want code to be formatted by PEP8 standards.
Typing and Docstrings must be added.
Please also make sure to write NOTE: on all more complex code segments.
If there is any duplicate code, try to handle it in one function to avoid code duplication.
Susan should also always review new code changes before merging to main.
New releases should not happen on Friday so we don't have to fix them during the weekend.
"""
print(
f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}"
)
coding_rules_chat_from_manager = """
Susan should always review new code changes before merging to main.
New releases should not happen on Friday so we don't have to fix them during the weekend.
"""
print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}")
# Add the text, and make it available for cognify
await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager])
print("Text added successfully.\n")
# Use LLMs and cognee to create knowledge graph
await cognee.cognify()
print("Cognify process complete.\n")
# Visualize graph after cognification
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization only after cognification: {file_path}\n")
# After graph is created, create a second pipeline that will go through the graph and enchance it with specific
# coding rule nodes
# extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify)
subgraph_extraction_tasks = [Task(extract_subgraph_chunks)]
# add_rule_associations is a function that handles processing coding rules from chunks and keeps track of
# existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created
# in the graph that specify coding rules found in conversations.
coding_rules_association_tasks = [
Task(
add_rule_associations,
rules_nodeset_name="coding_agent_rules",
task_config={"batch_size": 1},
),
]
# Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified).
# If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead
await memify(
extraction_tasks=subgraph_extraction_tasks,
enrichment_tasks=coding_rules_association_tasks,
)
# Find the new specific coding rules added to graph through memify (created based on chat conversation between team members)
coding_rules = await cognee.search(
query_text="List me the coding rules",
query_type=cognee.SearchType.CODING_RULES,
node_name=["coding_agent_rules"],
)
print("Coding rules created by memify:")
for coding_rule in coding_rules:
print("- " + coding_rule)
# Visualize new graph with added memify context
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html"
)
await visualize_graph(file_path)
print(f"\nOpen file to see graph visualization after memify enhancment: {file_path}")
if __name__ == "__main__":
logger = setup_logging(log_level=ERROR)
loop = asyncio.new_event_loop()
asyncio.set_event_loop(loop)
try:
loop.run_until_complete(main())
finally:
loop.run_until_complete(loop.shutdown_asyncgens())