feat: Add memify router

This commit is contained in:
Igor Ilic 2025-09-04 19:08:55 +02:00
parent b0d4503f2b
commit 805f443cd6
7 changed files with 118 additions and 11 deletions

View file

@ -22,6 +22,7 @@ from cognee.api.v1.settings.routers import get_settings_router
from cognee.api.v1.datasets.routers import get_datasets_router
from cognee.api.v1.cognify.routers import get_code_pipeline_router, get_cognify_router
from cognee.api.v1.search.routers import get_search_router
from cognee.api.v1.memify.routers import get_memify_router
from cognee.api.v1.add.routers import get_add_router
from cognee.api.v1.delete.routers import get_delete_router
from cognee.api.v1.responses.routers import get_responses_router
@ -230,6 +231,8 @@ app.include_router(get_add_router(), prefix="/api/v1/add", tags=["add"])
app.include_router(get_cognify_router(), prefix="/api/v1/cognify", tags=["cognify"])
app.include_router(get_memify_router(), prefix="/api/v1/memify", tags=["memify"])
app.include_router(get_search_router(), prefix="/api/v1/search", tags=["search"])
app.include_router(

View file

@ -1,6 +1,3 @@
import os
import requests
import subprocess
from uuid import UUID
from fastapi import APIRouter
@ -60,9 +57,6 @@ def get_add_router() -> APIRouter:
## Notes
- To add data to datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True)
- GitHub repositories are cloned and all files are processed
- HTTP URLs are fetched and their content is processed
- The ALLOW_HTTP_REQUESTS environment variable controls URL processing
- datasetId value can only be the UUID of an already existing dataset
"""
send_telemetry(

View file

View file

@ -0,0 +1 @@
from .get_memify_router import get_memify_router

View file

@ -0,0 +1,99 @@
from uuid import UUID
from fastapi import APIRouter
from fastapi.responses import JSONResponse
from fastapi import Depends
from pydantic import Field
from typing import List, Optional
from cognee.api.DTO import InDTO
from cognee.modules.users.models import User
from cognee.modules.users.methods import get_authenticated_user
from cognee.shared.utils import send_telemetry
from cognee.modules.pipelines.models import PipelineRunErrored
from cognee.shared.logging_utils import get_logger
logger = get_logger()
class MemifyPayloadDTO(InDTO):
extraction_tasks: List[str] = Field(
default=None,
examples=[[]],
)
enrichment_tasks: List[str] = (Field(default=None, examples=[[]]),)
data: Optional[str] = (Field(default=None),)
dataset_names: Optional[List[str]] = Field(default=None)
dataset_ids: Optional[List[UUID]] = Field(default=None, examples=[[]])
node_name: Optional[List[str]] = Field(default=None)
run_in_background: Optional[bool] = Field(default=False)
def get_memify_router() -> APIRouter:
router = APIRouter()
@router.post("", response_model=dict)
async def memify(payload: MemifyPayloadDTO, user: User = Depends(get_authenticated_user)):
"""
Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data,
custom data can also be provided instead which can be processed with provided extraction and enrichment tasks.
Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation.
## Request Parameters
- **extractionTasks** Optional[List[str]]: List of available Cognee Tasks to execute for graph/data extraction.
- **enrichmentTasks** Optional[List[str]]: List of available Cognee Tasks to handle enrichment of provided graph/data from extraction tasks.
- **data** Optional[List[str]]: The data to ingest. Can be any text data when custom extraction and enrichment tasks are used.
Data provided here will be forwarded to the first extraction task in the pipeline as input.
If no data is provided the whole graph (or subgraph if node_name/node_type is specified) will be forwarded
- **dataset_names** (Optional[List[str]]): Name of the datasets to memify
- **dataset_ids** (Optional[List[UUID]]): List of UUIDs of an already existing dataset
- **node_name** (Optional[List[str]]): Filter graph to specific named entities (for targeted search). Used when no data is provided.
- **run_in_background** (Optional[bool]): Whether to execute processing asynchronously. Defaults to False (blocking).
Either datasetName or datasetId must be provided.
## Response
Returns information about the add operation containing:
- Status of the operation
- Details about the processed data
- Any relevant metadata from the ingestion process
## Error Codes
- **400 Bad Request**: Neither datasetId nor datasetName provided
- **409 Conflict**: Error during memify operation
- **403 Forbidden**: User doesn't have permission to use dataset
## Notes
- To memify datasets not owned by the user, use dataset_id (when ENABLE_BACKEND_ACCESS_CONTROL is set to True)
- datasetId value can only be the UUID of an already existing dataset
"""
send_telemetry(
"Memify API Endpoint Invoked",
user.id,
additional_properties={"endpoint": "POST /v1/memify"},
)
if not payload.dataset_ids and not payload.dataset_names:
raise ValueError("Either datasetId or datasetName must be provided.")
from cognee import memify
try:
memify_run = await memify(
extraction_tasks=payload.extraction_tasks,
enrichment_tasks=payload.enrichment_tasks,
data=payload.data,
datasets=payload.dataset_ids if payload.dataset_ids else payload.dataset_names,
node_name=payload.node_name,
user=user,
)
if isinstance(memify_run, PipelineRunErrored):
return JSONResponse(status_code=420, content=memify_run)
return memify_run
except Exception as error:
return JSONResponse(status_code=409, content={"error": str(error)})
return router

View file

@ -26,8 +26,8 @@ logger = get_logger("memify")
async def memify(
extraction_tasks: List[Task] = [Task(extract_subgraph_chunks)],
enrichment_tasks: List[Task] = [Task(add_rule_associations)],
extraction_tasks: Union[List[Task], List[str]] = [Task(extract_subgraph_chunks)],
enrichment_tasks: Union[List[Task], List[str]] = [Task(add_rule_associations)],
data: Optional[Any] = None,
datasets: Union[str, list[str], list[UUID]] = None,
user: User = None,
@ -38,6 +38,15 @@ async def memify(
run_in_background: bool = False,
):
"""
Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data,
custom data can also be provided instead which can be processed with provided extraction and enrichment tasks.
Provided tasks and data will be arranged to run the Cognee pipeline and execute graph enrichment/creation.
This is the core processing step in Cognee that converts raw text and documents
into an intelligent knowledge graph. It analyzes content, extracts entities and
relationships, and creates semantic connections for enhanced search and reasoning.
Args:
extraction_tasks: List of Cognee Tasks to execute for graph/data extraction.
enrichment_tasks: List of Cognee Tasks to handle enrichment of provided graph/data from extraction tasks.

View file

@ -55,7 +55,7 @@ async def main():
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_only_cognify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization only after cognification: {file_path}")
print(f"Open file to see graph visualization only after cognification: {file_path}\n")
# After graph is created, create a second pipeline that will go through the graph and enchance it with specific
# coding rule nodes
@ -88,15 +88,16 @@ async def main():
node_name=["coding_agent_rules"],
)
print("Coding rules created by memify:")
for coding_rule in coding_rules:
print(coding_rule)
print("- " + coding_rule)
# Visualize new graph with added memify context
file_path = os.path.join(
pathlib.Path(__file__).parent, ".artifacts", "graph_visualization_after_memify.html"
)
await visualize_graph(file_path)
print(f"Open file to see graph visualization after memify enhancment: {file_path}")
print(f"\nOpen file to see graph visualization after memify enhancment: {file_path}")
if __name__ == "__main__":