diff --git a/cognee/__init__.py b/cognee/__init__.py index 7aa6388d9..be5a16b3b 100644 --- a/cognee/__init__.py +++ b/cognee/__init__.py @@ -18,6 +18,7 @@ logger = setup_logging() from .api.v1.add import add from .api.v1.delete import delete from .api.v1.cognify import cognify +from .modules.memify import memify from .api.v1.config.config import config from .api.v1.datasets.datasets import datasets from .api.v1.prune import prune diff --git a/cognee/api/client.py b/cognee/api/client.py index 7588638c3..69b774909 100644 --- a/cognee/api/client.py +++ b/cognee/api/client.py @@ -22,6 +22,7 @@ from cognee.api.v1.settings.routers import get_settings_router from cognee.api.v1.datasets.routers import get_datasets_router from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_router from cognee.api.v1.search.routers import get_search_router +from cognee.api.v1.memify.routers import get_memify_router from cognee.api.v1.add.routers import get_add_router from cognee.api.v1.delete.routers import get_delete_router from cognee.api.v1.responses.routers import get_responses_router @@ -235,6 +236,8 @@ app.include_router(get_add_router(), prefix="/api/v1/add", tags=["add"]) app.include_router(get_cognify_router(), prefix="/api/v1/cognify", tags=["cognify"]) +app.include_router(get_memify_router(), prefix="/api/v1/memify", tags=["memify"]) + app.include_router(get_search_router(), prefix="/api/v1/search", tags=["search"]) app.include_router( diff --git a/cognee/api/v1/add/add.py b/cognee/api/v1/add/add.py index 98771947c..eeb867984 100644 --- a/cognee/api/v1/add/add.py +++ b/cognee/api/v1/add/add.py @@ -150,7 +150,9 @@ async def add( user, authorized_dataset = await resolve_authorized_user_dataset(dataset_id, dataset_name, user) - await reset_dataset_pipeline_run_status(authorized_dataset.id, user) + await reset_dataset_pipeline_run_status( + authorized_dataset.id, user, pipeline_names=["add_pipeline", "cognify_pipeline"] + ) pipeline_run_info = None diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index 1703d9931..dfa7d275b 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -1,6 +1,3 @@ -import os -import requests -import subprocess from uuid import UUID from fastapi import APIRouter @@ -24,6 +21,7 @@ def get_add_router() -> APIRouter: async def add( data: List[UploadFile] = File(default=None), datasetName: Optional[str] = Form(default=None), + # Note: Literal is needed for Swagger use datasetId: Union[UUID, Literal[""], None] = Form(default=None, examples=[""]), node_set: Optional[List[str]] = Form(default=[""], example=[""]), user: User = Depends(get_authenticated_user), @@ -60,9 +58,6 @@ def get_add_router() -> APIRouter: ## Notes - To add data to datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True) - - GitHub repositories are cloned and all files are processed - - HTTP URLs are fetched and their content is processed - - The ALLOW_HTTP_REQUESTS environment variable controls URL processing - datasetId value can only be the UUID of an already existing dataset """ send_telemetry( diff --git a/cognee/api/v1/memify/__init__.py b/cognee/api/v1/memify/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/api/v1/memify/routers/__init__.py b/cognee/api/v1/memify/routers/__init__.py new file mode 100644 index 000000000..1d1793c35 --- /dev/null +++ b/cognee/api/v1/memify/routers/__init__.py @@ -0,0 +1 @@ +from .get_memify_router import get_memify_router diff --git a/cognee/api/v1/memify/routers/get_memify_router.py b/cognee/api/v1/memify/routers/get_memify_router.py new file mode 100644 index 000000000..1976d7414 --- /dev/null +++ b/cognee/api/v1/memify/routers/get_memify_router.py @@ -0,0 +1,100 @@ +from uuid import UUID + +from fastapi import APIRouter +from fastapi.responses import JSONResponse +from fastapi import Depends +from pydantic import Field +from typing import List, Optional, Union, Literal + +from cognee.api.DTO import InDTO +from cognee.modules.users.models import User +from cognee.modules.users.methods import get_authenticated_user +from cognee.shared.utils import send_telemetry +from cognee.modules.pipelines.models import PipelineRunErrored +from cognee.shared.logging_utils import get_logger + +logger = get_logger() + + +class MemifyPayloadDTO(InDTO): + extraction_tasks: Optional[List[str]] = Field( + default=None, + examples=[[]], + ) + enrichment_tasks: Optional[List[str]] = Field(default=None, examples=[[]]) + data: Optional[str] = Field(default="") + dataset_name: Optional[str] = Field(default=None) + # Note: Literal is needed for Swagger use + dataset_id: Union[UUID, Literal[""], None] = Field(default=None, examples=[""]) + node_name: Optional[List[str]] = Field(default=None, examples=[[]]) + run_in_background: Optional[bool] = Field(default=False) + + +def get_memify_router() -> APIRouter: + router = APIRouter() + + @router.post("", response_model=dict) + async def memify(payload: MemifyPayloadDTO, user: User = Depends(get_authenticated_user)): + """ + Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data, + custom data can also be provided instead which can be processed with provided extraction and enrichment tasks. + + Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation. + + ## Request Parameters + - **extractionTasks** Optional[List[str]]: List of available Cognee Tasks to execute for graph/data extraction. + - **enrichmentTasks** Optional[List[str]]: List of available Cognee Tasks to handle enrichment of provided graph/data from extraction tasks. + - **data** Optional[List[str]]: The data to ingest. Can be any text data when custom extraction and enrichment tasks are used. + Data provided here will be forwarded to the first extraction task in the pipeline as input. + If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded + - **dataset_name** (Optional[str]): Name of the datasets to memify + - **dataset_id** (Optional[UUID]): List of UUIDs of an already existing dataset + - **node_name** (Optional[List[str]]): Filter graph to specific named entities (for targeted search). Used when no data is provided. + - **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking). + + Either datasetName or datasetId must be provided. + + ## Response + Returns information about the add operation containing: + - Status of the operation + - Details about the processed data + - Any relevant metadata from the ingestion process + + ## Error Codes + - **400 Bad Request**: Neither datasetId nor datasetName provided + - **409 Conflict**: Error during memify operation + - **403 Forbidden**: User doesn't have permission to use dataset + + ## Notes + - To memify datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True) + - datasetId value can only be the UUID of an already existing dataset + """ + + send_telemetry( + "Memify API Endpoint Invoked", + user.id, + additional_properties={"endpoint": "POST /v1/memify"}, + ) + + if not payload.dataset_id and not payload.dataset_name: + raise ValueError("Either datasetId or datasetName must be provided.") + + try: + from cognee.modules.memify import memify as cognee_memify + + memify_run = await cognee_memify( + extraction_tasks=payload.extraction_tasks, + enrichment_tasks=payload.enrichment_tasks, + data=payload.data, + dataset=payload.dataset_id if payload.dataset_id else payload.dataset_name, + node_name=payload.node_name, + user=user, + ) + + if isinstance(memify_run, PipelineRunErrored): + return JSONResponse(status_code=420, content=memify_run) + return memify_run + except Exception as error: + return JSONResponse(status_code=409, content={"error": str(error)}) + + return router diff --git a/cognee/modules/data/methods/load_or_create_datasets.py b/cognee/modules/data/methods/load_or_create_datasets.py index 1d6ef3efb..2c9a6497c 100644 --- a/cognee/modules/data/methods/load_or_create_datasets.py +++ b/cognee/modules/data/methods/load_or_create_datasets.py @@ -2,7 +2,7 @@ from typing import List, Union from uuid import UUID from cognee.modules.data.models import Dataset -from cognee.modules.data.methods import create_authorized_dataset +from cognee.modules.data.methods.create_authorized_dataset import create_authorized_dataset from cognee.modules.data.exceptions import DatasetNotFoundError diff --git a/cognee/modules/graph/cognee_graph/CogneeGraph.py b/cognee/modules/graph/cognee_graph/CogneeGraph.py index 924532ce0..acfe04de7 100644 --- a/cognee/modules/graph/cognee_graph/CogneeGraph.py +++ b/cognee/modules/graph/cognee_graph/CogneeGraph.py @@ -76,7 +76,7 @@ class CogneeGraph(CogneeAbstractGraph): start_time = time.time() # Determine projection strategy - if node_type is not None and node_name not in [None, []]: + if node_type is not None and node_name not in [None, [], ""]: nodes_data, edges_data = await adapter.get_nodeset_subgraph( node_type=node_type, node_name=node_name ) diff --git a/cognee/modules/graph/utils/__init__.py b/cognee/modules/graph/utils/__init__.py index d1cda2d83..ebc648495 100644 --- a/cognee/modules/graph/utils/__init__.py +++ b/cognee/modules/graph/utils/__init__.py @@ -4,3 +4,4 @@ from .get_model_instance_from_graph import get_model_instance_from_graph from .retrieve_existing_edges import retrieve_existing_edges from .convert_node_to_data_point import convert_node_to_data_point from .deduplicate_nodes_and_edges import deduplicate_nodes_and_edges +from .resolve_edges_to_text import resolve_edges_to_text diff --git a/cognee/modules/graph/utils/resolve_edges_to_text.py b/cognee/modules/graph/utils/resolve_edges_to_text.py new file mode 100644 index 000000000..56c303abc --- /dev/null +++ b/cognee/modules/graph/utils/resolve_edges_to_text.py @@ -0,0 +1,67 @@ +async def resolve_edges_to_text(retrieved_edges: list) -> str: + """ + Converts retrieved graph edges into a human-readable string format. + + Parameters: + ----------- + + - retrieved_edges (list): A list of edges retrieved from the graph. + + Returns: + -------- + + - str: A formatted string representation of the nodes and their connections. + """ + + def _get_nodes(retrieved_edges: list) -> dict: + def _get_title(text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: + def _top_n_words(text, stop_words=None, top_n=3, separator=", "): + """Concatenates the top N frequent words in text.""" + if stop_words is None: + from cognee.modules.retrieval.utils.stop_words import DEFAULT_STOP_WORDS + + stop_words = DEFAULT_STOP_WORDS + + import string + + words = [word.lower().strip(string.punctuation) for word in text.split()] + + if stop_words: + words = [word for word in words if word and word not in stop_words] + + from collections import Counter + + top_words = [word for word, freq in Counter(words).most_common(top_n)] + + return separator.join(top_words) + + """Creates a title, by combining first words with most frequent words from the text.""" + first_n_words = text.split()[:first_n_words] + top_n_words = _top_n_words(text, top_n=top_n_words) + return f"{' '.join(first_n_words)}... [{top_n_words}]" + + """Creates a dictionary of nodes with their names and content.""" + nodes = {} + for edge in retrieved_edges: + for node in (edge.node1, edge.node2): + if node.id not in nodes: + text = node.attributes.get("text") + if text: + name = _get_title(text) + content = text + else: + name = node.attributes.get("name", "Unnamed Node") + content = node.attributes.get("description", name) + nodes[node.id] = {"node": node, "name": name, "content": content} + return nodes + + nodes = _get_nodes(retrieved_edges) + node_section = "\n".join( + f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" + for info in nodes.values() + ) + connection_section = "\n".join( + f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" + for edge in retrieved_edges + ) + return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" diff --git a/cognee/modules/memify/__init__.py b/cognee/modules/memify/__init__.py new file mode 100644 index 000000000..90aaa8404 --- /dev/null +++ b/cognee/modules/memify/__init__.py @@ -0,0 +1 @@ +from .memify import memify diff --git a/cognee/modules/memify/memify.py b/cognee/modules/memify/memify.py new file mode 100644 index 000000000..2d9b32a1b --- /dev/null +++ b/cognee/modules/memify/memify.py @@ -0,0 +1,118 @@ +from typing import Union, Optional, List, Type, Any +from uuid import UUID + +from cognee.shared.logging_utils import get_logger + +from cognee.modules.retrieval.utils.brute_force_triplet_search import get_memory_fragment +from cognee.context_global_variables import set_database_global_context_variables +from cognee.modules.engine.models.node_set import NodeSet +from cognee.modules.pipelines import run_pipeline +from cognee.modules.pipelines.tasks.task import Task +from cognee.modules.users.models import User +from cognee.modules.pipelines.layers.resolve_authorized_user_datasets import ( + resolve_authorized_user_datasets, +) +from cognee.modules.pipelines.layers.reset_dataset_pipeline_run_status import ( + reset_dataset_pipeline_run_status, +) +from cognee.modules.engine.operations.setup import setup +from cognee.modules.pipelines.layers.pipeline_execution_mode import get_pipeline_executor +from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks +from cognee.tasks.codingagents.coding_rule_associations import ( + add_rule_associations, +) + +logger = get_logger("memify") + + +async def memify( + extraction_tasks: Union[List[Task], List[str]] = None, + enrichment_tasks: Union[List[Task], List[str]] = None, + data: Optional[Any] = None, + dataset: Union[str, UUID] = "main_dataset", + user: User = None, + node_type: Optional[Type] = NodeSet, + node_name: Optional[List[str]] = None, + vector_db_config: Optional[dict] = None, + graph_db_config: Optional[dict] = None, + run_in_background: bool = False, +): + """ + Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data, + custom data can also be provided instead which can be processed with provided extraction and enrichment tasks. + + Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation. + + This is the core processing step in Cognee that converts raw text and documents + into an intelligent knowledge graph. It analyzes content, extracts entities and + relationships, and creates semantic connections for enhanced search and reasoning. + + Args: + extraction_tasks: List of Cognee Tasks to execute for graph/data extraction. + enrichment_tasks: List of Cognee Tasks to handle enrichment of provided graph/data from extraction tasks. + data: The data to ingest. Can be anything when custom extraction and enrichment tasks are used. + Data provided here will be forwarded to the first extraction task in the pipeline as input. + If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded + dataset: Dataset name or dataset uuid to process. + user: User context for authentication and data access. Uses default if None. + node_type: Filter graph to specific entity types (for advanced filtering). Used when no data is provided. + node_name: Filter graph to specific named entities (for targeted search). Used when no data is provided. + vector_db_config: Custom vector database configuration for embeddings storage. + graph_db_config: Custom graph database configuration for relationship storage. + run_in_background: If True, starts processing asynchronously and returns immediately. + If False, waits for completion before returning. + Background mode recommended for large datasets (>100MB). + Use pipeline_run_id from return value to monitor progress. + """ + + # Use default coding rules tasks if no tasks were provided + if not extraction_tasks: + extraction_tasks = [Task(extract_subgraph_chunks)] + if not enrichment_tasks: + enrichment_tasks = [ + Task( + add_rule_associations, + rules_nodeset_name="coding_agent_rules", + task_config={"batch_size": 1}, + ) + ] + + await setup() + + user, authorized_dataset_list = await resolve_authorized_user_datasets(dataset, user) + authorized_dataset = authorized_dataset_list[0] + + if not data: + # Will only be used if ENABLE_BACKEND_ACCESS_CONTROL is set to True + await set_database_global_context_variables( + authorized_dataset.id, authorized_dataset.owner_id + ) + + memory_fragment = await get_memory_fragment(node_type=node_type, node_name=node_name) + # Subgraphs should be a single element in the list to represent one data item + data = [memory_fragment] + + memify_tasks = [ + *extraction_tasks, # Unpack tasks provided to memify pipeline + *enrichment_tasks, + ] + + await reset_dataset_pipeline_run_status( + authorized_dataset.id, user, pipeline_names=["memify_pipeline"] + ) + + # By calling get pipeline executor we get a function that will have the run_pipeline run in the background or a function that we will need to wait for + pipeline_executor_func = get_pipeline_executor(run_in_background=run_in_background) + + # Run the run_pipeline in the background or blocking based on executor + return await pipeline_executor_func( + pipeline=run_pipeline, + tasks=memify_tasks, + user=user, + data=data, + datasets=authorized_dataset.id, + vector_db_config=vector_db_config, + graph_db_config=graph_db_config, + incremental_loading=False, + pipeline_name="memify_pipeline", + ) diff --git a/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py b/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py index cc72a6e51..bc59f9a6b 100644 --- a/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py +++ b/cognee/modules/pipelines/layers/reset_dataset_pipeline_run_status.py @@ -1,12 +1,28 @@ from uuid import UUID +from typing import Optional, List + from cognee.modules.pipelines.methods import get_pipeline_runs_by_dataset, reset_pipeline_run_status from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus from cognee.modules.users.models import User -async def reset_dataset_pipeline_run_status(dataset_id: UUID, user: User): +async def reset_dataset_pipeline_run_status( + dataset_id: UUID, user: User, pipeline_names: Optional[list[str]] = None +): + """Reset the status of all (or selected) pipeline runs for a dataset. + + If *pipeline_names* is given, only runs whose *pipeline_name* is in + that list are touched. + """ related_pipeline_runs = await get_pipeline_runs_by_dataset(dataset_id) for pipeline_run in related_pipeline_runs: - if pipeline_run.status is not PipelineRunStatus.DATASET_PROCESSING_INITIATED: - await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name) + # Skip runs that are initiated + if pipeline_run.status is PipelineRunStatus.DATASET_PROCESSING_INITIATED: + continue + + # If a name filter is provided, skip non-matching runs + if pipeline_names is not None and pipeline_run.pipeline_name not in pipeline_names: + continue + + await reset_pipeline_run_status(user.id, dataset_id, pipeline_run.pipeline_name) diff --git a/cognee/modules/pipelines/operations/pipeline.py b/cognee/modules/pipelines/operations/pipeline.py index cbe6dee5c..b59a171f7 100644 --- a/cognee/modules/pipelines/operations/pipeline.py +++ b/cognee/modules/pipelines/operations/pipeline.py @@ -5,6 +5,7 @@ from typing import Union from cognee.modules.pipelines.layers.setup_and_check_environment import ( setup_and_check_environment, ) + from cognee.shared.logging_utils import get_logger from cognee.modules.data.methods.get_dataset_data import get_dataset_data from cognee.modules.data.models import Data, Dataset diff --git a/cognee/modules/retrieval/coding_rules_retriever.py b/cognee/modules/retrieval/coding_rules_retriever.py new file mode 100644 index 000000000..364ff3236 --- /dev/null +++ b/cognee/modules/retrieval/coding_rules_retriever.py @@ -0,0 +1,22 @@ +from cognee.shared.logging_utils import get_logger +from cognee.tasks.codingagents.coding_rule_associations import get_existing_rules + +logger = get_logger("CodingRulesRetriever") + + +class CodingRulesRetriever: + """Retriever for handling codeing rule based searches.""" + + def __init__(self, rules_nodeset_name="coding_agent_rules"): + if isinstance(rules_nodeset_name, list): + if not rules_nodeset_name: + # If there is no provided nodeset set to coding_agent_rules + rules_nodeset_name = ["coding_agent_rules"] + rules_nodeset_name = rules_nodeset_name[0] + self.rules_nodeset_name = rules_nodeset_name + """Initialize retriever with search parameters.""" + + async def get_existing_rules(self, query_text): + return await get_existing_rules( + rules_nodeset_name=self.rules_nodeset_name, return_list=True + ) diff --git a/cognee/modules/retrieval/graph_completion_retriever.py b/cognee/modules/retrieval/graph_completion_retriever.py index 6a5193c56..bc4fa27b3 100644 --- a/cognee/modules/retrieval/graph_completion_retriever.py +++ b/cognee/modules/retrieval/graph_completion_retriever.py @@ -5,6 +5,7 @@ import string from cognee.infrastructure.engine import DataPoint from cognee.tasks.storage import add_data_points +from cognee.modules.graph.utils import resolve_edges_to_text from cognee.modules.graph.utils.convert_node_to_data_point import get_all_subclasses from cognee.modules.retrieval.base_retriever import BaseRetriever from cognee.modules.retrieval.utils.brute_force_triplet_search import brute_force_triplet_search @@ -53,22 +54,6 @@ class GraphCompletionRetriever(BaseRetriever): self.node_type = node_type self.node_name = node_name - def _get_nodes(self, retrieved_edges: list) -> dict: - """Creates a dictionary of nodes with their names and content.""" - nodes = {} - for edge in retrieved_edges: - for node in (edge.node1, edge.node2): - if node.id not in nodes: - text = node.attributes.get("text") - if text: - name = self._get_title(text) - content = text - else: - name = node.attributes.get("name", "Unnamed Node") - content = node.attributes.get("description", name) - nodes[node.id] = {"node": node, "name": name, "content": content} - return nodes - async def resolve_edges_to_text(self, retrieved_edges: list) -> str: """ Converts retrieved graph edges into a human-readable string format. @@ -83,16 +68,7 @@ class GraphCompletionRetriever(BaseRetriever): - str: A formatted string representation of the nodes and their connections. """ - nodes = self._get_nodes(retrieved_edges) - node_section = "\n".join( - f"Node: {info['name']}\n__node_content_start__\n{info['content']}\n__node_content_end__\n" - for info in nodes.values() - ) - connection_section = "\n".join( - f"{nodes[edge.node1.id]['name']} --[{edge.attributes['relationship_type']}]--> {nodes[edge.node2.id]['name']}" - for edge in retrieved_edges - ) - return f"Nodes:\n{node_section}\n\nConnections:\n{connection_section}" + return await resolve_edges_to_text(retrieved_edges) async def get_triplets(self, query: str) -> list: """ @@ -196,26 +172,6 @@ class GraphCompletionRetriever(BaseRetriever): return [completion] - def _top_n_words(self, text, stop_words=None, top_n=3, separator=", "): - """Concatenates the top N frequent words in text.""" - if stop_words is None: - stop_words = DEFAULT_STOP_WORDS - - words = [word.lower().strip(string.punctuation) for word in text.split()] - - if stop_words: - words = [word for word in words if word and word not in stop_words] - - top_words = [word for word, freq in Counter(words).most_common(top_n)] - - return separator.join(top_words) - - def _get_title(self, text: str, first_n_words: int = 7, top_n_words: int = 3) -> str: - """Creates a title, by combining first words with most frequent words from the text.""" - first_n_words = text.split()[:first_n_words] - top_n_words = self._top_n_words(text, top_n=top_n_words) - return f"{' '.join(first_n_words)}... [{top_n_words}]" - async def save_qa(self, question: str, answer: str, context: str, triplets: List) -> None: """ Saves a question and answer pair for later analysis or storage. diff --git a/cognee/modules/search/methods/search.py b/cognee/modules/search/methods/search.py index fe5df0345..749a36547 100644 --- a/cognee/modules/search/methods/search.py +++ b/cognee/modules/search/methods/search.py @@ -14,6 +14,7 @@ from cognee.modules.retrieval.summaries_retriever import SummariesRetriever from cognee.modules.retrieval.completion_retriever import CompletionRetriever from cognee.modules.retrieval.graph_completion_retriever import GraphCompletionRetriever from cognee.modules.retrieval.temporal_retriever import TemporalRetriever +from cognee.modules.retrieval.coding_rules_retriever import CodingRulesRetriever from cognee.modules.retrieval.graph_summary_completion_retriever import ( GraphSummaryCompletionRetriever, ) @@ -169,6 +170,9 @@ async def specific_search( SearchType.NATURAL_LANGUAGE: NaturalLanguageRetriever().get_completion, SearchType.FEEDBACK: UserQAFeedback(last_k=last_k).add_feedback, SearchType.TEMPORAL: TemporalRetriever(top_k=top_k).get_completion, + SearchType.CODING_RULES: CodingRulesRetriever( + rules_nodeset_name=node_name + ).get_existing_rules, } # If the query type is FEELING_LUCKY, select the search type intelligently diff --git a/cognee/modules/search/types/SearchType.py b/cognee/modules/search/types/SearchType.py index a9b7989fe..f5a23efff 100644 --- a/cognee/modules/search/types/SearchType.py +++ b/cognee/modules/search/types/SearchType.py @@ -16,3 +16,4 @@ class SearchType(Enum): FEELING_LUCKY = "FEELING_LUCKY" FEEDBACK = "FEEDBACK" TEMPORAL = "TEMPORAL" + CODING_RULES = "CODING_RULES" diff --git a/cognee/tasks/codingagents/__init__.py b/cognee/tasks/codingagents/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/cognee/tasks/codingagents/coding_rule_associations.py b/cognee/tasks/codingagents/coding_rule_associations.py new file mode 100644 index 000000000..c809bc68f --- /dev/null +++ b/cognee/tasks/codingagents/coding_rule_associations.py @@ -0,0 +1,129 @@ +from uuid import NAMESPACE_OID, uuid5 + +from cognee.infrastructure.databases.graph import get_graph_engine +from cognee.infrastructure.databases.vector import get_vector_engine + +from cognee.low_level import DataPoint +from cognee.infrastructure.llm import LLMGateway +from cognee.shared.logging_utils import get_logger +from cognee.modules.engine.models import NodeSet +from cognee.tasks.storage import add_data_points, index_graph_edges +from typing import Optional, List, Any +from pydantic import Field + +logger = get_logger("coding_rule_association") + + +class Rule(DataPoint): + """A single developer rule extracted from text.""" + + text: str = Field(..., description="The coding rule associated with the conversation") + belongs_to_set: Optional[NodeSet] = None + metadata: dict = {"index_fields": ["rule"]} + + +class RuleSet(DataPoint): + """Collection of parsed rules.""" + + rules: List[Rule] = Field( + ..., + description="List of developer rules extracted from the input text. Each rule represents a coding best practice or guideline.", + ) + + +async def get_existing_rules(rules_nodeset_name: str, return_list: bool = False) -> str: + graph_engine = await get_graph_engine() + nodes_data, _ = await graph_engine.get_nodeset_subgraph( + node_type=NodeSet, node_name=[rules_nodeset_name] + ) + + existing_rules = [ + item[1]["text"] + for item in nodes_data + if isinstance(item, tuple) + and len(item) == 2 + and isinstance(item[1], dict) + and "text" in item[1] + ] + + if not return_list: + existing_rules = "\n".join(f"- {rule}" for rule in existing_rules) + + return existing_rules + + +async def get_origin_edges(data: str, rules: List[Rule]) -> list[Any]: + vector_engine = get_vector_engine() + + origin_chunk = await vector_engine.search("DocumentChunk_text", data, limit=1) + + try: + origin_id = origin_chunk[0].id + except (AttributeError, KeyError, TypeError, IndexError): + origin_id = None + + relationships = [] + + if origin_id and isinstance(rules, (list, tuple)) and len(rules) > 0: + for rule in rules: + try: + rule_id = getattr(rule, "id", None) + if rule_id is not None: + rel_name = "rule_associated_from" + relationships.append( + ( + rule_id, + origin_id, + rel_name, + { + "relationship_name": rel_name, + "source_node_id": rule_id, + "target_node_id": origin_id, + "ontology_valid": False, + }, + ) + ) + except Exception as e: + logger.info(f"Warning: Skipping invalid rule due to error: {e}") + else: + logger.info("No valid origin_id or rules provided.") + + return relationships + + +async def add_rule_associations( + data: str, + rules_nodeset_name: str, + user_prompt_location: str = "coding_rule_association_agent_user.txt", + system_prompt_location: str = "coding_rule_association_agent_system.txt", +): + if isinstance(data, list): + # If data is a list of strings join all strings in list + data = " ".join(data) + + graph_engine = await get_graph_engine() + existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name) + + user_context = {"chat": data, "rules": existing_rules} + + user_prompt = LLMGateway.render_prompt(user_prompt_location, context=user_context) + system_prompt = LLMGateway.render_prompt(system_prompt_location, context={}) + + rule_list = await LLMGateway.acreate_structured_output( + text_input=user_prompt, system_prompt=system_prompt, response_model=RuleSet + ) + + rules_nodeset = NodeSet( + id=uuid5(NAMESPACE_OID, name=rules_nodeset_name), name=rules_nodeset_name + ) + for rule in rule_list.rules: + rule.belongs_to_set = rules_nodeset + + edges_to_save = await get_origin_edges(data=data, rules=rule_list.rules) + + await add_data_points(data_points=rule_list.rules) + + if len(edges_to_save) > 0: + await graph_engine.add_edges(edges_to_save) + + await index_graph_edges() diff --git a/cognee/tasks/memify/__init__.py b/cognee/tasks/memify/__init__.py new file mode 100644 index 000000000..692bac443 --- /dev/null +++ b/cognee/tasks/memify/__init__.py @@ -0,0 +1,2 @@ +from .extract_subgraph import extract_subgraph +from .extract_subgraph_chunks import extract_subgraph_chunks diff --git a/cognee/tasks/memify/extract_subgraph.py b/cognee/tasks/memify/extract_subgraph.py new file mode 100644 index 000000000..d6ca3773f --- /dev/null +++ b/cognee/tasks/memify/extract_subgraph.py @@ -0,0 +1,7 @@ +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + + +async def extract_subgraph(subgraphs: list[CogneeGraph]): + for subgraph in subgraphs: + for edge in subgraph.edges: + yield edge diff --git a/cognee/tasks/memify/extract_subgraph_chunks.py b/cognee/tasks/memify/extract_subgraph_chunks.py new file mode 100644 index 000000000..9aab498d7 --- /dev/null +++ b/cognee/tasks/memify/extract_subgraph_chunks.py @@ -0,0 +1,11 @@ +from cognee.modules.graph.cognee_graph.CogneeGraph import CogneeGraph + + +async def extract_subgraph_chunks(subgraphs: list[CogneeGraph]): + """ + Get all Document Chunks from subgraphs and forward to next task in pipeline + """ + for subgraph in subgraphs: + for node in subgraph.nodes.values(): + if node.attributes["type"] == "DocumentChunk": + yield node.attributes["text"] diff --git a/examples/python/memify_coding_agent_example.py b/examples/python/memify_coding_agent_example.py new file mode 100644 index 000000000..1fd3b1528 --- /dev/null +++ b/examples/python/memify_coding_agent_example.py @@ -0,0 +1,110 @@ +import asyncio +import pathlib +import os + +import cognee +from cognee import memify +from cognee.api.v1.visualize.visualize import visualize_graph +from cognee.shared.logging_utils import setup_logging, ERROR +from cognee.modules.pipelines.tasks.task import Task +from cognee.tasks.memify.extract_subgraph_chunks import extract_subgraph_chunks +from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations + +# Prerequisites: +# 1. Copy `.env.template` and rename it to `.env`. +# 2. Add your OpenAI API key to the `.env` file in the `LLM_API_KEY` field: +# LLM_API_KEY = "your_key_here" + + +async def main(): + # Create a clean slate for cognee -- reset data and system state + print("Resetting cognee data...") + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + print("Data reset complete.\n") + print("Adding conversation about rules to cognee:\n") + + coding_rules_chat_from_principal_engineer = """ + We want code to be formatted by PEP8 standards. + Typing and Docstrings must be added. + Please also make sure to write NOTE: on all more complex code segments. + If there is any duplicate code, try to handle it in one function to avoid code duplication. + Susan should also always review new code changes before merging to main. + New releases should not happen on Friday so we don't have to fix them during the weekend. + """ + print( + f"Coding rules conversation with principal engineer: {coding_rules_chat_from_principal_engineer}" + ) + + coding_rules_chat_from_manager = """ + Susan should always review new code changes before merging to main. + New releases should not happen on Friday so we don't have to fix them during the weekend. + """ + print(f"Coding rules conversation with manager: {coding_rules_chat_from_manager}") + + # Add the text, and make it available for cognify + await cognee.add([coding_rules_chat_from_principal_engineer, coding_rules_chat_from_manager]) + print("Text added successfully.\n") + + # Use LLMs and cognee to create knowledge graph + await cognee.cognify() + print("Cognify process complete.\n") + + # Visualize graph after cognification + file_path = os.path.join( + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html" + ) + await visualize_graph(file_path) + print(f"Open file to see graph visualization only after cognification: {file_path}\n") + + # After graph is created, create a second pipeline that will go through the graph and enchance it with specific + # coding rule nodes + + # extract_subgraph_chunks is a function that returns all document chunks from specified subgraphs (if no subgraph is specifed the whole graph will be sent through memify) + subgraph_extraction_tasks = [Task(extract_subgraph_chunks)] + + # add_rule_associations is a function that handles processing coding rules from chunks and keeps track of + # existing rules so duplicate rules won't be created. As the result of this processing new Rule nodes will be created + # in the graph that specify coding rules found in conversations. + coding_rules_association_tasks = [ + Task( + add_rule_associations, + rules_nodeset_name="coding_agent_rules", + task_config={"batch_size": 1}, + ), + ] + + # Memify accepts these tasks and orchestrates forwarding of graph data through these tasks (if data is not specified). + # If data is explicitely specified in the arguments this specified data will be forwarded through the tasks instead + await memify( + extraction_tasks=subgraph_extraction_tasks, + enrichment_tasks=coding_rules_association_tasks, + ) + + # Find the new specific coding rules added to graph through memify (created based on chat conversation between team members) + coding_rules = await cognee.search( + query_text="List me the coding rules", + query_type=cognee.SearchType.CODING_RULES, + node_name=["coding_agent_rules"], + ) + + print("Coding rules created by memify:") + for coding_rule in coding_rules: + print("- " + coding_rule) + + # Visualize new graph with added memify context + file_path = os.path.join( + pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html" + ) + await visualize_graph(file_path) + print(f"\nOpen file to see graph visualization after memify enhancment: {file_path}") + + +if __name__ == "__main__": + logger = setup_logging(log_level=ERROR) + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + try: + loop.run_until_complete(main()) + finally: + loop.run_until_complete(loop.shutdown_asyncgens())