From 6996cdb887cf69acc58abd185cd1b2c50c95188a Mon Sep 17 00:00:00 2001 From: Daulet Amirkhanov Date: Fri, 31 Oct 2025 16:48:50 +0000 Subject: [PATCH] Refactor: break down `server.py`, extract tools --- cognee-mcp/src/clients/__init__.py | 3 + cognee-mcp/src/{ => clients}/cognee_client.py | 0 cognee-mcp/src/server.py | 992 +----------------- cognee-mcp/src/shared/__init__.py | 3 + cognee-mcp/src/shared/context.py | 11 + cognee-mcp/src/tools/__init__.py | 27 + cognee-mcp/src/tools/codify.py | 79 ++ cognee-mcp/src/tools/codify_status.py | 51 + .../src/tools/cognee_add_developer_rules.py | 103 ++ cognee-mcp/src/tools/cognify.py | 178 ++++ cognee-mcp/src/tools/cognify_status.py | 51 + cognee-mcp/src/tools/delete.py | 90 ++ cognee-mcp/src/tools/get_developer_rules.py | 54 + cognee-mcp/src/tools/list_data.py | 137 +++ cognee-mcp/src/tools/prune.py | 45 + cognee-mcp/src/tools/save_interaction.py | 75 ++ cognee-mcp/src/tools/search.py | 166 +++ cognee-mcp/src/tools/utils.py | 35 + 18 files changed, 1136 insertions(+), 964 deletions(-) create mode 100644 cognee-mcp/src/clients/__init__.py rename cognee-mcp/src/{ => clients}/cognee_client.py (100%) create mode 100644 cognee-mcp/src/shared/__init__.py create mode 100644 cognee-mcp/src/shared/context.py create mode 100644 cognee-mcp/src/tools/__init__.py create mode 100644 cognee-mcp/src/tools/codify.py create mode 100644 cognee-mcp/src/tools/codify_status.py create mode 100644 cognee-mcp/src/tools/cognee_add_developer_rules.py create mode 100644 cognee-mcp/src/tools/cognify.py create mode 100644 cognee-mcp/src/tools/cognify_status.py create mode 100644 cognee-mcp/src/tools/delete.py create mode 100644 cognee-mcp/src/tools/get_developer_rules.py create mode 100644 cognee-mcp/src/tools/list_data.py create mode 100644 cognee-mcp/src/tools/prune.py create mode 100644 cognee-mcp/src/tools/save_interaction.py create mode 100644 cognee-mcp/src/tools/search.py create mode 100644 cognee-mcp/src/tools/utils.py diff --git a/cognee-mcp/src/clients/__init__.py b/cognee-mcp/src/clients/__init__.py new file mode 100644 index 000000000..a77c5345e --- /dev/null +++ b/cognee-mcp/src/clients/__init__.py @@ -0,0 +1,3 @@ +from .cognee_client import CogneeClient + +__all__ = ["CogneeClient"] diff --git a/cognee-mcp/src/cognee_client.py b/cognee-mcp/src/clients/cognee_client.py similarity index 100% rename from cognee-mcp/src/cognee_client.py rename to cognee-mcp/src/clients/cognee_client.py diff --git a/cognee-mcp/src/server.py b/cognee-mcp/src/server.py index ce6dad88a..fedb4b527 100755 --- a/cognee-mcp/src/server.py +++ b/cognee-mcp/src/server.py @@ -1,46 +1,47 @@ -import json -import os import sys import argparse import asyncio import subprocess from pathlib import Path -from typing import Optional -from cognee.shared.logging_utils import get_logger, setup_logging, get_log_file_location -import importlib.util -from contextlib import redirect_stdout -import mcp.types as types +from cognee.shared.logging_utils import get_logger, setup_logging from mcp.server import FastMCP -from cognee.modules.storage.utils import JSONEncoder from starlette.responses import JSONResponse -from starlette.middleware import Middleware from starlette.middleware.cors import CORSMiddleware import uvicorn -try: - from .cognee_client import CogneeClient -except ImportError: - from cognee_client import CogneeClient - - -try: - from cognee.tasks.codingagents.coding_rule_associations import ( - add_rule_associations, - get_existing_rules, - ) -except ModuleNotFoundError: - from .codingagents.coding_rule_associations import ( - add_rule_associations, - get_existing_rules, - ) +from src.shared import context +from src.clients import CogneeClient +from src.tools import ( + cognee_add_developer_rules, + cognify, + save_interaction, + codify, + search, + get_developer_rules, + list_data, + delete, + prune, + cognify_status, + codify_status, +) mcp = FastMCP("Cognee") logger = get_logger() -cognee_client: Optional[CogneeClient] = None +mcp.tool()(cognee_add_developer_rules) +mcp.tool()(cognify) +mcp.tool()(save_interaction) +mcp.tool()(codify) +mcp.tool()(search) +mcp.tool()(get_developer_rules) +mcp.tool()(list_data) +mcp.tool()(delete) +mcp.tool()(prune) +mcp.tool()(cognify_status) +mcp.tool()(codify_status) async def run_sse_with_cors(): @@ -90,945 +91,7 @@ async def health_check(request): return JSONResponse({"status": "ok"}) -@mcp.tool() -async def cognee_add_developer_rules( - base_path: str = ".", graph_model_file: str = None, graph_model_name: str = None -) -> list: - """ - Ingest core developer rule files into Cognee's memory layer. - - This function loads a predefined set of developer-related configuration, - rule, and documentation files from the base repository and assigns them - to the special 'developer_rules' node set in Cognee. It ensures these - foundational files are always part of the structured memory graph. - - Parameters - ---------- - base_path : str - Root path to resolve relative file paths. Defaults to current directory. - - graph_model_file : str, optional - Optional path to a custom schema file for knowledge graph generation. - - graph_model_name : str, optional - Optional class name to use from the graph_model_file schema. - - Returns - ------- - list - A message indicating how many rule files were scheduled for ingestion, - and how to check their processing status. - - Notes - ----- - - Each file is processed asynchronously in the background. - - Files are attached to the 'developer_rules' node set. - - Missing files are skipped with a logged warning. - """ - - developer_rule_paths = [ - ".cursorrules", - ".cursor/rules", - ".same/todos.md", - ".windsurfrules", - ".clinerules", - "CLAUDE.md", - ".sourcegraph/memory.md", - "AGENT.md", - "AGENTS.md", - ] - - async def cognify_task(file_path: str) -> None: - with redirect_stdout(sys.stderr): - logger.info(f"Starting cognify for: {file_path}") - try: - await cognee_client.add(file_path, node_set=["developer_rules"]) - - model = None - if graph_model_file and graph_model_name: - if cognee_client.use_api: - logger.warning( - "Custom graph models are not supported in API mode, ignoring." - ) - else: - from cognee.shared.data_models import KnowledgeGraph - - model = load_class(graph_model_file, graph_model_name) - - await cognee_client.cognify(graph_model=model) - logger.info(f"Cognify finished for: {file_path}") - except Exception as e: - logger.error(f"Cognify failed for {file_path}: {str(e)}") - raise ValueError(f"Failed to cognify: {str(e)}") - - tasks = [] - for rel_path in developer_rule_paths: - abs_path = os.path.join(base_path, rel_path) - if os.path.isfile(abs_path): - tasks.append(asyncio.create_task(cognify_task(abs_path))) - else: - logger.warning(f"Skipped missing developer rule file: {abs_path}") - log_file = get_log_file_location() - return [ - types.TextContent( - type="text", - text=( - f"Started cognify for {len(tasks)} developer rule files in background.\n" - f"All are added to the `developer_rules` node set.\n" - f"Use `cognify_status` or check logs at {log_file} to monitor progress." - ), - ) - ] - - -@mcp.tool() -async def cognify( - data: str, graph_model_file: str = None, graph_model_name: str = None, custom_prompt: str = None -) -> list: - """ - Transform ingested data into a structured knowledge graph. - - This is the core processing step in Cognee that converts raw text and documents - into an intelligent knowledge graph. It analyzes content, extracts entities and - relationships, and creates semantic connections for enhanced search and reasoning. - - Prerequisites: - - **LLM_API_KEY**: Must be configured (required for entity extraction and graph generation) - - **Data Added**: Must have data previously added via `cognee.add()` - - **Vector Database**: Must be accessible for embeddings storage - - **Graph Database**: Must be accessible for relationship storage - - Input Requirements: - - **Content Types**: Works with any text-extractable content including: - * Natural language documents - * Structured data (CSV, JSON) - * Code repositories - * Academic papers and technical documentation - * Mixed multimedia content (with text extraction) - - Processing Pipeline: - 1. **Document Classification**: Identifies document types and structures - 2. **Permission Validation**: Ensures user has processing rights - 3. **Text Chunking**: Breaks content into semantically meaningful segments - 4. **Entity Extraction**: Identifies key concepts, people, places, organizations - 5. **Relationship Detection**: Discovers connections between entities - 6. **Graph Construction**: Builds semantic knowledge graph with embeddings - 7. **Content Summarization**: Creates hierarchical summaries for navigation - - Parameters - ---------- - data : str - The data to be processed and transformed into structured knowledge. - This can include natural language, file location, or any text-based information - that should become part of the agent's memory. - - graph_model_file : str, optional - Path to a custom schema file that defines the structure of the generated knowledge graph. - If provided, this file will be loaded using importlib to create a custom graph model. - Default is None, which uses Cognee's built-in KnowledgeGraph model. - - graph_model_name : str, optional - Name of the class within the graph_model_file to instantiate as the graph model. - Required if graph_model_file is specified. - Default is None, which uses the default KnowledgeGraph class. - - custom_prompt : str, optional - Custom prompt string to use for entity extraction and graph generation. - If provided, this prompt will be used instead of the default prompts for - knowledge graph extraction. The prompt should guide the LLM on how to - extract entities and relationships from the text content. - - Returns - ------- - list - A list containing a single TextContent object with information about the - background task launch and how to check its status. - - Next Steps: - After successful cognify processing, use search functions to query the knowledge: - - ```python - import cognee - from cognee import SearchType - - # Process your data into knowledge graph - await cognee.cognify() - - # Query for insights using different search types: - - # 1. Natural language completion with graph context - insights = await cognee.search( - "What are the main themes?", - query_type=SearchType.GRAPH_COMPLETION - ) - - # 2. Get entity relationships and connections - relationships = await cognee.search( - "connections between concepts", - query_type=SearchType.GRAPH_COMPLETION - ) - - # 3. Find relevant document chunks - chunks = await cognee.search( - "specific topic", - query_type=SearchType.CHUNKS - ) - ``` - - Environment Variables: - Required: - - LLM_API_KEY: API key for your LLM provider - - Optional: - - LLM_PROVIDER, LLM_MODEL, VECTOR_DB_PROVIDER, GRAPH_DATABASE_PROVIDER - - LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False) - - LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60) - - Notes - ----- - - The function launches a background task and returns immediately - - The actual cognify process may take significant time depending on text length - - Use the cognify_status tool to check the progress of the operation - - """ - - async def cognify_task( - data: str, - graph_model_file: str = None, - graph_model_name: str = None, - custom_prompt: str = None, - ) -> str: - """Build knowledge graph from the input text""" - # NOTE: MCP uses stdout to communicate, we must redirect all output - # going to stdout ( like the print function ) to stderr. - with redirect_stdout(sys.stderr): - logger.info("Cognify process starting.") - - graph_model = None - if graph_model_file and graph_model_name: - if cognee_client.use_api: - logger.warning("Custom graph models are not supported in API mode, ignoring.") - else: - from cognee.shared.data_models import KnowledgeGraph - - graph_model = load_class(graph_model_file, graph_model_name) - - await cognee_client.add(data) - - try: - await cognee_client.cognify(custom_prompt=custom_prompt, graph_model=graph_model) - logger.info("Cognify process finished.") - except Exception as e: - logger.error("Cognify process failed.") - raise ValueError(f"Failed to cognify: {str(e)}") - - asyncio.create_task( - cognify_task( - data=data, - graph_model_file=graph_model_file, - graph_model_name=graph_model_name, - custom_prompt=custom_prompt, - ) - ) - - log_file = get_log_file_location() - text = ( - f"Background process launched due to MCP timeout limitations.\n" - f"To check current cognify status use the cognify_status tool\n" - f"or check the log file at: {log_file}" - ) - - return [ - types.TextContent( - type="text", - text=text, - ) - ] - - -@mcp.tool( - name="save_interaction", description="Logs user-agent interactions and query-answer pairs" -) -async def save_interaction(data: str) -> list: - """ - Transform and save a user-agent interaction into structured knowledge. - - Parameters - ---------- - data : str - The input string containing user queries and corresponding agent answers. - - Returns - ------- - list - A list containing a single TextContent object with information about the background task launch. - """ - - async def save_user_agent_interaction(data: str) -> None: - """Build knowledge graph from the interaction data""" - with redirect_stdout(sys.stderr): - logger.info("Save interaction process starting.") - - await cognee_client.add(data, node_set=["user_agent_interaction"]) - - try: - await cognee_client.cognify() - logger.info("Save interaction process finished.") - - # Rule associations only work in direct mode - if not cognee_client.use_api: - logger.info("Generating associated rules from interaction data.") - await add_rule_associations(data=data, rules_nodeset_name="coding_agent_rules") - logger.info("Associated rules generated from interaction data.") - else: - logger.warning("Rule associations are not available in API mode, skipping.") - - except Exception as e: - logger.error("Save interaction process failed.") - raise ValueError(f"Failed to Save interaction: {str(e)}") - - asyncio.create_task( - save_user_agent_interaction( - data=data, - ) - ) - - log_file = get_log_file_location() - text = ( - f"Background process launched to process the user-agent interaction.\n" - f"To check the current status, use the cognify_status tool or check the log file at: {log_file}" - ) - - return [ - types.TextContent( - type="text", - text=text, - ) - ] - - -@mcp.tool() -async def codify(repo_path: str) -> list: - """ - Analyze and generate a code-specific knowledge graph from a software repository. - - This function launches a background task that processes the provided repository - and builds a code knowledge graph. The function returns immediately while - the processing continues in the background due to MCP timeout constraints. - - Parameters - ---------- - repo_path : str - Path to the code repository to analyze. This can be a local file path or a - relative path to a repository. The path should point to the root of the - repository or a specific directory within it. - - Returns - ------- - list - A list containing a single TextContent object with information about the - background task launch and how to check its status. - - Notes - ----- - - The function launches a background task and returns immediately - - The code graph generation may take significant time for larger repositories - - Use the codify_status tool to check the progress of the operation - - Process results are logged to the standard Cognee log file - - All stdout is redirected to stderr to maintain MCP communication integrity - """ - - if cognee_client.use_api: - error_msg = "❌ Codify operation is not available in API mode. Please use direct mode for code graph pipeline." - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - - async def codify_task(repo_path: str): - # NOTE: MCP uses stdout to communicate, we must redirect all output - # going to stdout ( like the print function ) to stderr. - with redirect_stdout(sys.stderr): - logger.info("Codify process starting.") - from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline - - results = [] - async for result in run_code_graph_pipeline(repo_path, False): - results.append(result) - logger.info(result) - if all(results): - logger.info("Codify process finished succesfully.") - else: - logger.info("Codify process failed.") - - asyncio.create_task(codify_task(repo_path)) - - log_file = get_log_file_location() - text = ( - f"Background process launched due to MCP timeout limitations.\n" - f"To check current codify status use the codify_status tool\n" - f"or you can check the log file at: {log_file}" - ) - - return [ - types.TextContent( - type="text", - text=text, - ) - ] - - -@mcp.tool() -async def search(search_query: str, search_type: str) -> list: - """ - Search and query the knowledge graph for insights, information, and connections. - - This is the final step in the Cognee workflow that retrieves information from the - processed knowledge graph. It supports multiple search modes optimized for different - use cases - from simple fact retrieval to complex reasoning and code analysis. - - Search Prerequisites: - - **LLM_API_KEY**: Required for GRAPH_COMPLETION and RAG_COMPLETION search types - - **Data Added**: Must have data previously added via `cognee.add()` - - **Knowledge Graph Built**: Must have processed data via `cognee.cognify()` - - **Vector Database**: Must be accessible for semantic search functionality - - Search Types & Use Cases: - - **GRAPH_COMPLETION** (Recommended): - Natural language Q&A using full graph context and LLM reasoning. - Best for: Complex questions, analysis, summaries, insights. - Returns: Conversational AI responses with graph-backed context. - - **RAG_COMPLETION**: - Traditional RAG using document chunks without graph structure. - Best for: Direct document retrieval, specific fact-finding. - Returns: LLM responses based on relevant text chunks. - - **CHUNKS**: - Raw text segments that match the query semantically. - Best for: Finding specific passages, citations, exact content. - Returns: Ranked list of relevant text chunks with metadata. - - **SUMMARIES**: - Pre-generated hierarchical summaries of content. - Best for: Quick overviews, document abstracts, topic summaries. - Returns: Multi-level summaries from detailed to high-level. - - **CODE**: - Code-specific search with syntax and semantic understanding. - Best for: Finding functions, classes, implementation patterns. - Returns: Structured code information with context and relationships. - - **CYPHER**: - Direct graph database queries using Cypher syntax. - Best for: Advanced users, specific graph traversals, debugging. - Returns: Raw graph query results. - - **FEELING_LUCKY**: - Intelligently selects and runs the most appropriate search type. - Best for: General-purpose queries or when you're unsure which search type is best. - Returns: The results from the automatically selected search type. - - Parameters - ---------- - search_query : str - Your question or search query in natural language. - Examples: - - "What are the main themes in this research?" - - "How do these concepts relate to each other?" - - "Find information about machine learning algorithms" - - "What functions handle user authentication?" - - search_type : str - The type of search to perform. Valid options include: - - "GRAPH_COMPLETION": Returns an LLM response based on the search query and Cognee's memory - - "RAG_COMPLETION": Returns an LLM response based on the search query and standard RAG data - - "CODE": Returns code-related knowledge in JSON format - - "CHUNKS": Returns raw text chunks from the knowledge graph - - "SUMMARIES": Returns pre-generated hierarchical summaries - - "CYPHER": Direct graph database queries - - "FEELING_LUCKY": Automatically selects best search type - - The search_type is case-insensitive and will be converted to uppercase. - - Returns - ------- - list - A list containing a single TextContent object with the search results. - The format of the result depends on the search_type: - - **GRAPH_COMPLETION/RAG_COMPLETION**: Conversational AI response strings - - **CHUNKS**: Relevant text passages with source metadata - - **SUMMARIES**: Hierarchical summaries from general to specific - - **CODE**: Structured code information with context - - **FEELING_LUCKY**: Results in format of automatically selected search type - - **CYPHER**: Raw graph query results - - Performance & Optimization: - - **GRAPH_COMPLETION**: Slower but most intelligent, uses LLM + graph context - - **RAG_COMPLETION**: Medium speed, uses LLM + document chunks (no graph traversal) - - **CHUNKS**: Fastest, pure vector similarity search without LLM - - **SUMMARIES**: Fast, returns pre-computed summaries - - **CODE**: Medium speed, specialized for code understanding - - **FEELING_LUCKY**: Variable speed, uses LLM + search type selection intelligently - - Environment Variables: - Required for LLM-based search types (GRAPH_COMPLETION, RAG_COMPLETION): - - LLM_API_KEY: API key for your LLM provider - - Optional: - - LLM_PROVIDER, LLM_MODEL: Configure LLM for search responses - - VECTOR_DB_PROVIDER: Must match what was used during cognify - - GRAPH_DATABASE_PROVIDER: Must match what was used during cognify - - Notes - ----- - - Different search types produce different output formats - - The function handles the conversion between Cognee's internal result format and MCP's output format - - """ - - async def search_task(search_query: str, search_type: str) -> str: - """Search the knowledge graph""" - # NOTE: MCP uses stdout to communicate, we must redirect all output - # going to stdout ( like the print function ) to stderr. - with redirect_stdout(sys.stderr): - search_results = await cognee_client.search( - query_text=search_query, query_type=search_type - ) - - # Handle different result formats based on API vs direct mode - if cognee_client.use_api: - # API mode returns JSON-serialized results - if isinstance(search_results, str): - return search_results - elif isinstance(search_results, list): - if ( - search_type.upper() in ["GRAPH_COMPLETION", "RAG_COMPLETION"] - and len(search_results) > 0 - ): - return str(search_results[0]) - return str(search_results) - else: - return json.dumps(search_results, cls=JSONEncoder) - else: - # Direct mode processing - if search_type.upper() == "CODE": - return json.dumps(search_results, cls=JSONEncoder) - elif ( - search_type.upper() == "GRAPH_COMPLETION" - or search_type.upper() == "RAG_COMPLETION" - ): - return str(search_results[0]) - elif search_type.upper() == "CHUNKS": - return str(search_results) - elif search_type.upper() == "INSIGHTS": - results = retrieved_edges_to_string(search_results) - return results - else: - return str(search_results) - - search_results = await search_task(search_query, search_type) - return [types.TextContent(type="text", text=search_results)] - - -@mcp.tool() -async def get_developer_rules() -> list: - """ - Retrieve all developer rules that were generated based on previous interactions. - - This tool queries the Cognee knowledge graph and returns a list of developer - rules. - - Parameters - ---------- - None - - Returns - ------- - list - A list containing a single TextContent object with the retrieved developer rules. - The format is plain text containing the developer rules in bulletpoints. - - Notes - ----- - - The specific logic for fetching rules is handled internally. - - This tool does not accept any parameters and is intended for simple rule inspection use cases. - """ - - async def fetch_rules_from_cognee() -> str: - """Collect all developer rules from Cognee""" - with redirect_stdout(sys.stderr): - if cognee_client.use_api: - logger.warning("Developer rules retrieval is not available in API mode") - return "Developer rules retrieval is not available in API mode" - - developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") - return developer_rules - - rules_text = await fetch_rules_from_cognee() - - return [types.TextContent(type="text", text=rules_text)] - - -@mcp.tool() -async def list_data(dataset_id: str = None) -> list: - """ - List all datasets and their data items with IDs for deletion operations. - - This function helps users identify data IDs and dataset IDs that can be used - with the delete tool. It provides a comprehensive view of available data. - - Parameters - ---------- - dataset_id : str, optional - If provided, only list data items from this specific dataset. - If None, lists all datasets and their data items. - Should be a valid UUID string. - - Returns - ------- - list - A list containing a single TextContent object with formatted information - about datasets and data items, including their IDs for deletion. - - Notes - ----- - - Use this tool to identify data_id and dataset_id values for the delete tool - - The output includes both dataset information and individual data items - - UUIDs are displayed in a format ready for use with other tools - """ - from uuid import UUID - - with redirect_stdout(sys.stderr): - try: - output_lines = [] - - if dataset_id: - # Detailed data listing for specific dataset is only available in direct mode - if cognee_client.use_api: - return [ - types.TextContent( - type="text", - text="❌ Detailed data listing for specific datasets is not available in API mode.\nPlease use the API directly or use direct mode.", - ) - ] - - from cognee.modules.users.methods import get_default_user - from cognee.modules.data.methods import get_dataset, get_dataset_data - - logger.info(f"Listing data for dataset: {dataset_id}") - dataset_uuid = UUID(dataset_id) - user = await get_default_user() - - dataset = await get_dataset(user.id, dataset_uuid) - - if not dataset: - return [ - types.TextContent(type="text", text=f"❌ Dataset not found: {dataset_id}") - ] - - # Get data items in the dataset - data_items = await get_dataset_data(dataset.id) - - output_lines.append(f"📁 Dataset: {dataset.name}") - output_lines.append(f" ID: {dataset.id}") - output_lines.append(f" Created: {dataset.created_at}") - output_lines.append(f" Data items: {len(data_items)}") - output_lines.append("") - - if data_items: - for i, data_item in enumerate(data_items, 1): - output_lines.append(f" 📄 Data item #{i}:") - output_lines.append(f" Data ID: {data_item.id}") - output_lines.append(f" Name: {data_item.name or 'Unnamed'}") - output_lines.append(f" Created: {data_item.created_at}") - output_lines.append("") - else: - output_lines.append(" (No data items in this dataset)") - - else: - # List all datasets - works in both modes - logger.info("Listing all datasets") - datasets = await cognee_client.list_datasets() - - if not datasets: - return [ - types.TextContent( - type="text", - text="📂 No datasets found.\nUse the cognify tool to create your first dataset!", - ) - ] - - output_lines.append("📂 Available Datasets:") - output_lines.append("=" * 50) - output_lines.append("") - - for i, dataset in enumerate(datasets, 1): - # In API mode, dataset is a dict; in direct mode, it's formatted as dict - if isinstance(dataset, dict): - output_lines.append(f"{i}. 📁 {dataset.get('name', 'Unnamed')}") - output_lines.append(f" Dataset ID: {dataset.get('id')}") - output_lines.append(f" Created: {dataset.get('created_at', 'N/A')}") - else: - output_lines.append(f"{i}. 📁 {dataset.name}") - output_lines.append(f" Dataset ID: {dataset.id}") - output_lines.append(f" Created: {dataset.created_at}") - output_lines.append("") - - if not cognee_client.use_api: - output_lines.append("💡 To see data items in a specific dataset, use:") - output_lines.append(' list_data(dataset_id="your-dataset-id-here")') - output_lines.append("") - output_lines.append("🗑️ To delete specific data, use:") - output_lines.append(' delete(data_id="data-id", dataset_id="dataset-id")') - - result_text = "\n".join(output_lines) - logger.info("List data operation completed successfully") - - return [types.TextContent(type="text", text=result_text)] - - except ValueError as e: - error_msg = f"❌ Invalid UUID format: {str(e)}" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - - except Exception as e: - error_msg = f"❌ Failed to list data: {str(e)}" - logger.error(f"List data error: {str(e)}") - return [types.TextContent(type="text", text=error_msg)] - - -@mcp.tool() -async def delete(data_id: str, dataset_id: str, mode: str = "soft") -> list: - """ - Delete specific data from a dataset in the Cognee knowledge graph. - - This function removes a specific data item from a dataset while keeping the - dataset itself intact. It supports both soft and hard deletion modes. - - Parameters - ---------- - data_id : str - The UUID of the data item to delete from the knowledge graph. - This should be a valid UUID string identifying the specific data item. - - dataset_id : str - The UUID of the dataset containing the data to be deleted. - This should be a valid UUID string identifying the dataset. - - mode : str, optional - The deletion mode to use. Options are: - - "soft" (default): Removes the data but keeps related entities that might be shared - - "hard": Also removes degree-one entity nodes that become orphaned after deletion - Default is "soft" for safer deletion that preserves shared knowledge. - - Returns - ------- - list - A list containing a single TextContent object with the deletion results, - including status, deleted node counts, and confirmation details. - - Notes - ----- - - This operation cannot be undone. The specified data will be permanently removed. - - Hard mode may remove additional entity nodes that become orphaned - - The function provides detailed feedback about what was deleted - - Use this for targeted deletion instead of the prune tool which removes everything - """ - from uuid import UUID - - with redirect_stdout(sys.stderr): - try: - logger.info( - f"Starting delete operation for data_id: {data_id}, dataset_id: {dataset_id}, mode: {mode}" - ) - - # Convert string UUIDs to UUID objects - data_uuid = UUID(data_id) - dataset_uuid = UUID(dataset_id) - - # Call the cognee delete function via client - result = await cognee_client.delete( - data_id=data_uuid, dataset_id=dataset_uuid, mode=mode - ) - - logger.info(f"Delete operation completed successfully: {result}") - - # Format the result for MCP response - formatted_result = json.dumps(result, indent=2, cls=JSONEncoder) - - return [ - types.TextContent( - type="text", - text=f"✅ Delete operation completed successfully!\n\n{formatted_result}", - ) - ] - - except ValueError as e: - # Handle UUID parsing errors - error_msg = f"❌ Invalid UUID format: {str(e)}" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - - except Exception as e: - # Handle all other errors (DocumentNotFoundError, DatasetNotFoundError, etc.) - error_msg = f"❌ Delete operation failed: {str(e)}" - logger.error(f"Delete operation error: {str(e)}") - return [types.TextContent(type="text", text=error_msg)] - - -@mcp.tool() -async def prune(): - """ - Reset the Cognee knowledge graph by removing all stored information. - - This function performs a complete reset of both the data layer and system layer - of the Cognee knowledge graph, removing all nodes, edges, and associated metadata. - It is typically used during development or when needing to start fresh with a new - knowledge base. - - Returns - ------- - list - A list containing a single TextContent object with confirmation of the prune operation. - - Notes - ----- - - This operation cannot be undone. All memory data will be permanently deleted. - - The function prunes both data content (using prune_data) and system metadata (using prune_system) - - This operation is not available in API mode - """ - with redirect_stdout(sys.stderr): - try: - await cognee_client.prune_data() - await cognee_client.prune_system(metadata=True) - return [types.TextContent(type="text", text="Pruned")] - except NotImplementedError: - error_msg = "❌ Prune operation is not available in API mode" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - except Exception as e: - error_msg = f"❌ Prune operation failed: {str(e)}" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - - -@mcp.tool() -async def cognify_status(): - """ - Get the current status of the cognify pipeline. - - This function retrieves information about current and recently completed cognify operations - in the main_dataset. It provides details on progress, success/failure status, and statistics - about the processed data. - - Returns - ------- - list - A list containing a single TextContent object with the status information as a string. - The status includes information about active and completed jobs for the cognify_pipeline. - - Notes - ----- - - The function retrieves pipeline status specifically for the "cognify_pipeline" on the "main_dataset" - - Status information includes job progress, execution time, and completion status - - The status is returned in string format for easy reading - - This operation is not available in API mode - """ - with redirect_stdout(sys.stderr): - try: - from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id - from cognee.modules.users.methods import get_default_user - - user = await get_default_user() - status = await cognee_client.get_pipeline_status( - [await get_unique_dataset_id("main_dataset", user)], "cognify_pipeline" - ) - return [types.TextContent(type="text", text=str(status))] - except NotImplementedError: - error_msg = "❌ Pipeline status is not available in API mode" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - except Exception as e: - error_msg = f"❌ Failed to get cognify status: {str(e)}" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - - -@mcp.tool() -async def codify_status(): - """ - Get the current status of the codify pipeline. - - This function retrieves information about current and recently completed codify operations - in the codebase dataset. It provides details on progress, success/failure status, and statistics - about the processed code repositories. - - Returns - ------- - list - A list containing a single TextContent object with the status information as a string. - The status includes information about active and completed jobs for the cognify_code_pipeline. - - Notes - ----- - - The function retrieves pipeline status specifically for the "cognify_code_pipeline" on the "codebase" dataset - - Status information includes job progress, execution time, and completion status - - The status is returned in string format for easy reading - - This operation is not available in API mode - """ - with redirect_stdout(sys.stderr): - try: - from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id - from cognee.modules.users.methods import get_default_user - - user = await get_default_user() - status = await cognee_client.get_pipeline_status( - [await get_unique_dataset_id("codebase", user)], "cognify_code_pipeline" - ) - return [types.TextContent(type="text", text=str(status))] - except NotImplementedError: - error_msg = "❌ Pipeline status is not available in API mode" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - except Exception as e: - error_msg = f"❌ Failed to get codify status: {str(e)}" - logger.error(error_msg) - return [types.TextContent(type="text", text=error_msg)] - - -def node_to_string(node): - node_data = ", ".join( - [f'{key}: "{value}"' for key, value in node.items() if key in ["id", "name"]] - ) - - return f"Node({node_data})" - - -def retrieved_edges_to_string(search_results): - edge_strings = [] - for triplet in search_results: - node1, edge, node2 = triplet - relationship_type = edge["relationship_name"] - edge_str = f"{node_to_string(node1)} {relationship_type} {node_to_string(node2)}" - edge_strings.append(edge_str) - - return "\n".join(edge_strings) - - -def load_class(model_file, model_name): - model_file = os.path.abspath(model_file) - spec = importlib.util.spec_from_file_location("graph_model", model_file) - module = importlib.util.module_from_spec(spec) - spec.loader.exec_module(module) - - model_class = getattr(module, model_name) - - return model_class - - async def main(): - global cognee_client - parser = argparse.ArgumentParser() parser.add_argument( @@ -1090,6 +153,7 @@ async def main(): # Initialize the global CogneeClient cognee_client = CogneeClient(api_url=args.api_url, api_token=args.api_token) + context.set_cognee_client(cognee_client) mcp.settings.host = args.host mcp.settings.port = args.port diff --git a/cognee-mcp/src/shared/__init__.py b/cognee-mcp/src/shared/__init__.py new file mode 100644 index 000000000..67337ece1 --- /dev/null +++ b/cognee-mcp/src/shared/__init__.py @@ -0,0 +1,3 @@ +from . import context + +__all__ = ["context"] diff --git a/cognee-mcp/src/shared/context.py b/cognee-mcp/src/shared/context.py new file mode 100644 index 000000000..95c4620af --- /dev/null +++ b/cognee-mcp/src/shared/context.py @@ -0,0 +1,11 @@ +from typing import Optional + +from src.clients.cognee_client import CogneeClient + +cognee_client: Optional["CogneeClient"] = None + + +def set_cognee_client(client: "CogneeClient") -> None: + """Set the global cognee client instance.""" + global cognee_client + cognee_client = client diff --git a/cognee-mcp/src/tools/__init__.py b/cognee-mcp/src/tools/__init__.py new file mode 100644 index 000000000..dd2ace6c7 --- /dev/null +++ b/cognee-mcp/src/tools/__init__.py @@ -0,0 +1,27 @@ +"""Cognee MCP Tools - All tools for interacting with the Cognee knowledge graph.""" + +from .cognee_add_developer_rules import cognee_add_developer_rules +from .cognify import cognify +from .save_interaction import save_interaction +from .codify import codify +from .search import search +from .get_developer_rules import get_developer_rules +from .list_data import list_data +from .delete import delete +from .prune import prune +from .cognify_status import cognify_status +from .codify_status import codify_status + +__all__ = [ + "cognee_add_developer_rules", + "cognify", + "save_interaction", + "codify", + "search", + "get_developer_rules", + "list_data", + "delete", + "prune", + "cognify_status", + "codify_status", +] diff --git a/cognee-mcp/src/tools/codify.py b/cognee-mcp/src/tools/codify.py new file mode 100644 index 000000000..cec272105 --- /dev/null +++ b/cognee-mcp/src/tools/codify.py @@ -0,0 +1,79 @@ +"""Tool for analyzing and generating code-specific knowledge graphs from repositories.""" + +import sys +import asyncio +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger, get_log_file_location + +from src.shared import context + +logger = get_logger() + + +async def codify(repo_path: str) -> list: + """ + Analyze and generate a code-specific knowledge graph from a software repository. + + This function launches a background task that processes the provided repository + and builds a code knowledge graph. The function returns immediately while + the processing continues in the background due to MCP timeout constraints. + + Parameters + ---------- + repo_path : str + Path to the code repository to analyze. This can be a local file path or a + relative path to a repository. The path should point to the root of the + repository or a specific directory within it. + + Returns + ------- + list + A list containing a single TextContent object with information about the + background task launch and how to check its status. + + Notes + ----- + - The function launches a background task and returns immediately + - The code graph generation may take significant time for larger repositories + - Use the codify_status tool to check the progress of the operation + - Process results are logged to the standard Cognee log file + - All stdout is redirected to stderr to maintain MCP communication integrity + """ + + if context.cognee_client.use_api: + error_msg = "❌ Codify operation is not available in API mode. Please use direct mode for code graph pipeline." + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + + async def codify_task(repo_path: str): + # NOTE: MCP uses stdout to communicate, we must redirect all output + # going to stdout ( like the print function ) to stderr. + with redirect_stdout(sys.stderr): + logger.info("Codify process starting.") + from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline + + results = [] + async for result in run_code_graph_pipeline(repo_path, False): + results.append(result) + logger.info(result) + if all(results): + logger.info("Codify process finished succesfully.") + else: + logger.info("Codify process failed.") + + asyncio.create_task(codify_task(repo_path)) + + log_file = get_log_file_location() + text = ( + f"Background process launched due to MCP timeout limitations.\n" + f"To check current codify status use the codify_status tool\n" + f"or you can check the log file at: {log_file}" + ) + + return [ + types.TextContent( + type="text", + text=text, + ) + ] diff --git a/cognee-mcp/src/tools/codify_status.py b/cognee-mcp/src/tools/codify_status.py new file mode 100644 index 000000000..43a9936a5 --- /dev/null +++ b/cognee-mcp/src/tools/codify_status.py @@ -0,0 +1,51 @@ +"""Tool for getting the status of the codify pipeline.""" + +import sys +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger + +from src.shared import context + +logger = get_logger() + + +async def codify_status(): + """ + Get the current status of the codify pipeline. + + This function retrieves information about current and recently completed codify operations + in the codebase dataset. It provides details on progress, success/failure status, and statistics + about the processed code repositories. + + Returns + ------- + list + A list containing a single TextContent object with the status information as a string. + The status includes information about active and completed jobs for the cognify_code_pipeline. + + Notes + ----- + - The function retrieves pipeline status specifically for the "cognify_code_pipeline" on the "codebase" dataset + - Status information includes job progress, execution time, and completion status + - The status is returned in string format for easy reading + - This operation is not available in API mode + """ + with redirect_stdout(sys.stderr): + try: + from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id + from cognee.modules.users.methods import get_default_user + + user = await get_default_user() + status = await context.cognee_client.get_pipeline_status( + [await get_unique_dataset_id("codebase", user)], "cognify_code_pipeline" + ) + return [types.TextContent(type="text", text=str(status))] + except NotImplementedError: + error_msg = "❌ Pipeline status is not available in API mode" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + except Exception as e: + error_msg = f"❌ Failed to get codify status: {str(e)}" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] diff --git a/cognee-mcp/src/tools/cognee_add_developer_rules.py b/cognee-mcp/src/tools/cognee_add_developer_rules.py new file mode 100644 index 000000000..03042acb4 --- /dev/null +++ b/cognee-mcp/src/tools/cognee_add_developer_rules.py @@ -0,0 +1,103 @@ +"""Tool for ingesting core developer rule files into Cognee's memory layer.""" + +import os +import sys +import asyncio +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger, get_log_file_location + +from src.shared import context +from .utils import load_class + +logger = get_logger() + + +async def cognee_add_developer_rules( + base_path: str = ".", graph_model_file: str = None, graph_model_name: str = None +) -> list: + """ + Ingest core developer rule files into Cognee's memory layer. + + This function loads a predefined set of developer-related configuration, + rule, and documentation files from the base repository and assigns them + to the special 'developer_rules' node set in Cognee. It ensures these + foundational files are always part of the structured memory graph. + + Parameters + ---------- + base_path : str + Root path to resolve relative file paths. Defaults to current directory. + + graph_model_file : str, optional + Optional path to a custom schema file for knowledge graph generation. + + graph_model_name : str, optional + Optional class name to use from the graph_model_file schema. + + Returns + ------- + list + A message indicating how many rule files were scheduled for ingestion, + and how to check their processing status. + + Notes + ----- + - Each file is processed asynchronously in the background. + - Files are attached to the 'developer_rules' node set. + - Missing files are skipped with a logged warning. + """ + + developer_rule_paths = [ + ".cursorrules", + ".cursor/rules", + ".same/todos.md", + ".windsurfrules", + ".clinerules", + "CLAUDE.md", + ".sourcegraph/memory.md", + "AGENT.md", + "AGENTS.md", + ] + + async def cognify_task(file_path: str) -> None: + with redirect_stdout(sys.stderr): + logger.info(f"Starting cognify for: {file_path}") + try: + await context.cognee_client.add(file_path, node_set=["developer_rules"]) + + model = None + if graph_model_file and graph_model_name: + if context.cognee_client.use_api: + logger.warning( + "Custom graph models are not supported in API mode, ignoring." + ) + else: + from cognee.shared.data_models import KnowledgeGraph + + model = load_class(graph_model_file, graph_model_name) + + await context.cognee_client.cognify(graph_model=model) + logger.info(f"Cognify finished for: {file_path}") + except Exception as e: + logger.error(f"Cognify failed for {file_path}: {str(e)}") + raise ValueError(f"Failed to cognify: {str(e)}") + + tasks = [] + for rel_path in developer_rule_paths: + abs_path = os.path.join(base_path, rel_path) + if os.path.isfile(abs_path): + tasks.append(asyncio.create_task(cognify_task(abs_path))) + else: + logger.warning(f"Skipped missing developer rule file: {abs_path}") + log_file = get_log_file_location() + return [ + types.TextContent( + type="text", + text=( + f"Started cognify for {len(tasks)} developer rule files in background.\n" + f"All are added to the `developer_rules` node set.\n" + f"Use `cognify_status` or check logs at {log_file} to monitor progress." + ), + ) + ] diff --git a/cognee-mcp/src/tools/cognify.py b/cognee-mcp/src/tools/cognify.py new file mode 100644 index 000000000..7d443007d --- /dev/null +++ b/cognee-mcp/src/tools/cognify.py @@ -0,0 +1,178 @@ +"""Tool for transforming data into a structured knowledge graph.""" + +import sys +import asyncio +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger, get_log_file_location + +from src.shared import context +from .utils import load_class + +logger = get_logger() + + +async def cognify( + data: str, graph_model_file: str = None, graph_model_name: str = None, custom_prompt: str = None +) -> list: + """ + Transform ingested data into a structured knowledge graph. + + This is the core processing step in Cognee that converts raw text and documents + into an intelligent knowledge graph. It analyzes content, extracts entities and + relationships, and creates semantic connections for enhanced search and reasoning. + + Prerequisites: + - **LLM_API_KEY**: Must be configured (required for entity extraction and graph generation) + - **Data Added**: Must have data previously added via `cognee.add()` + - **Vector Database**: Must be accessible for embeddings storage + - **Graph Database**: Must be accessible for relationship storage + + Input Requirements: + - **Content Types**: Works with any text-extractable content including: + * Natural language documents + * Structured data (CSV, JSON) + * Code repositories + * Academic papers and technical documentation + * Mixed multimedia content (with text extraction) + + Processing Pipeline: + 1. **Document Classification**: Identifies document types and structures + 2. **Permission Validation**: Ensures user has processing rights + 3. **Text Chunking**: Breaks content into semantically meaningful segments + 4. **Entity Extraction**: Identifies key concepts, people, places, organizations + 5. **Relationship Detection**: Discovers connections between entities + 6. **Graph Construction**: Builds semantic knowledge graph with embeddings + 7. **Content Summarization**: Creates hierarchical summaries for navigation + + Parameters + ---------- + data : str + The data to be processed and transformed into structured knowledge. + This can include natural language, file location, or any text-based information + that should become part of the agent's memory. + + graph_model_file : str, optional + Path to a custom schema file that defines the structure of the generated knowledge graph. + If provided, this file will be loaded using importlib to create a custom graph model. + Default is None, which uses Cognee's built-in KnowledgeGraph model. + + graph_model_name : str, optional + Name of the class within the graph_model_file to instantiate as the graph model. + Required if graph_model_file is specified. + Default is None, which uses the default KnowledgeGraph class. + + custom_prompt : str, optional + Custom prompt string to use for entity extraction and graph generation. + If provided, this prompt will be used instead of the default prompts for + knowledge graph extraction. The prompt should guide the LLM on how to + extract entities and relationships from the text content. + + Returns + ------- + list + A list containing a single TextContent object with information about the + background task launch and how to check its status. + + Next Steps: + After successful cognify processing, use search functions to query the knowledge: + + ```python + import cognee + from cognee import SearchType + + # Process your data into knowledge graph + await cognee.cognify() + + # Query for insights using different search types: + + # 1. Natural language completion with graph context + insights = await cognee.search( + "What are the main themes?", + query_type=SearchType.GRAPH_COMPLETION + ) + + # 2. Get entity relationships and connections + relationships = await cognee.search( + "connections between concepts", + query_type=SearchType.GRAPH_COMPLETION + ) + + # 3. Find relevant document chunks + chunks = await cognee.search( + "specific topic", + query_type=SearchType.CHUNKS + ) + ``` + + Environment Variables: + Required: + - LLM_API_KEY: API key for your LLM provider + + Optional: + - LLM_PROVIDER, LLM_MODEL, VECTOR_DB_PROVIDER, GRAPH_DATABASE_PROVIDER + - LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False) + - LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60) + + Notes + ----- + - The function launches a background task and returns immediately + - The actual cognify process may take significant time depending on text length + - Use the cognify_status tool to check the progress of the operation + + """ + + async def cognify_task( + data: str, + graph_model_file: str = None, + graph_model_name: str = None, + custom_prompt: str = None, + ) -> str: + """Build knowledge graph from the input text""" + # NOTE: MCP uses stdout to communicate, we must redirect all output + # going to stdout ( like the print function ) to stderr. + with redirect_stdout(sys.stderr): + logger.info("Cognify process starting.") + + graph_model = None + if graph_model_file and graph_model_name: + if context.cognee_client.use_api: + logger.warning("Custom graph models are not supported in API mode, ignoring.") + else: + from cognee.shared.data_models import KnowledgeGraph + + graph_model = load_class(graph_model_file, graph_model_name) + + await context.cognee_client.add(data) + + try: + await context.cognee_client.cognify( + custom_prompt=custom_prompt, graph_model=graph_model + ) + logger.info("Cognify process finished.") + except Exception as e: + logger.error("Cognify process failed.") + raise ValueError(f"Failed to cognify: {str(e)}") + + asyncio.create_task( + cognify_task( + data=data, + graph_model_file=graph_model_file, + graph_model_name=graph_model_name, + custom_prompt=custom_prompt, + ) + ) + + log_file = get_log_file_location() + text = ( + f"Background process launched due to MCP timeout limitations.\n" + f"To check current cognify status use the cognify_status tool\n" + f"or check the log file at: {log_file}" + ) + + return [ + types.TextContent( + type="text", + text=text, + ) + ] diff --git a/cognee-mcp/src/tools/cognify_status.py b/cognee-mcp/src/tools/cognify_status.py new file mode 100644 index 000000000..b3dd00e1d --- /dev/null +++ b/cognee-mcp/src/tools/cognify_status.py @@ -0,0 +1,51 @@ +"""Tool for getting the status of the cognify pipeline.""" + +import sys +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger + +from src.shared import context + +logger = get_logger() + + +async def cognify_status(): + """ + Get the current status of the cognify pipeline. + + This function retrieves information about current and recently completed cognify operations + in the main_dataset. It provides details on progress, success/failure status, and statistics + about the processed data. + + Returns + ------- + list + A list containing a single TextContent object with the status information as a string. + The status includes information about active and completed jobs for the cognify_pipeline. + + Notes + ----- + - The function retrieves pipeline status specifically for the "cognify_pipeline" on the "main_dataset" + - Status information includes job progress, execution time, and completion status + - The status is returned in string format for easy reading + - This operation is not available in API mode + """ + with redirect_stdout(sys.stderr): + try: + from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id + from cognee.modules.users.methods import get_default_user + + user = await get_default_user() + status = await context.cognee_client.get_pipeline_status( + [await get_unique_dataset_id("main_dataset", user)], "cognify_pipeline" + ) + return [types.TextContent(type="text", text=str(status))] + except NotImplementedError: + error_msg = "❌ Pipeline status is not available in API mode" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + except Exception as e: + error_msg = f"❌ Failed to get cognify status: {str(e)}" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] diff --git a/cognee-mcp/src/tools/delete.py b/cognee-mcp/src/tools/delete.py new file mode 100644 index 000000000..d2b87cbcc --- /dev/null +++ b/cognee-mcp/src/tools/delete.py @@ -0,0 +1,90 @@ +"""Tool for deleting specific data from a dataset.""" + +import sys +import json +from uuid import UUID +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger +from cognee.modules.storage.utils import JSONEncoder + +from src.shared import context + +logger = get_logger() + + +async def delete(data_id: str, dataset_id: str, mode: str = "soft") -> list: + """ + Delete specific data from a dataset in the Cognee knowledge graph. + + This function removes a specific data item from a dataset while keeping the + dataset itself intact. It supports both soft and hard deletion modes. + + Parameters + ---------- + data_id : str + The UUID of the data item to delete from the knowledge graph. + This should be a valid UUID string identifying the specific data item. + + dataset_id : str + The UUID of the dataset containing the data to be deleted. + This should be a valid UUID string identifying the dataset. + + mode : str, optional + The deletion mode to use. Options are: + - "soft" (default): Removes the data but keeps related entities that might be shared + - "hard": Also removes degree-one entity nodes that become orphaned after deletion + Default is "soft" for safer deletion that preserves shared knowledge. + + Returns + ------- + list + A list containing a single TextContent object with the deletion results, + including status, deleted node counts, and confirmation details. + + Notes + ----- + - This operation cannot be undone. The specified data will be permanently removed. + - Hard mode may remove additional entity nodes that become orphaned + - The function provides detailed feedback about what was deleted + - Use this for targeted deletion instead of the prune tool which removes everything + """ + + with redirect_stdout(sys.stderr): + try: + logger.info( + f"Starting delete operation for data_id: {data_id}, dataset_id: {dataset_id}, mode: {mode}" + ) + + # Convert string UUIDs to UUID objects + data_uuid = UUID(data_id) + dataset_uuid = UUID(dataset_id) + + # Call the cognee delete function via client + result = await context.cognee_client.delete( + data_id=data_uuid, dataset_id=dataset_uuid, mode=mode + ) + + logger.info(f"Delete operation completed successfully: {result}") + + # Format the result for MCP response + formatted_result = json.dumps(result, indent=2, cls=JSONEncoder) + + return [ + types.TextContent( + type="text", + text=f"✅ Delete operation completed successfully!\n\n{formatted_result}", + ) + ] + + except ValueError as e: + # Handle UUID parsing errors + error_msg = f"❌ Invalid UUID format: {str(e)}" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + + except Exception as e: + # Handle all other errors (DocumentNotFoundError, DatasetNotFoundError, etc.) + error_msg = f"❌ Delete operation failed: {str(e)}" + logger.error(f"Delete operation error: {str(e)}") + return [types.TextContent(type="text", text=error_msg)] diff --git a/cognee-mcp/src/tools/get_developer_rules.py b/cognee-mcp/src/tools/get_developer_rules.py new file mode 100644 index 000000000..a9c7810bc --- /dev/null +++ b/cognee-mcp/src/tools/get_developer_rules.py @@ -0,0 +1,54 @@ +"""Tool for retrieving developer rules from the knowledge graph.""" + +import sys +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger + +from src.shared import context + +logger = get_logger() + +# Import coding agent rules functions +try: + from cognee.tasks.codingagents.coding_rule_associations import get_existing_rules +except ModuleNotFoundError: + from src.codingagents.coding_rule_associations import get_existing_rules + + +async def get_developer_rules() -> list: + """ + Retrieve all developer rules that were generated based on previous interactions. + + This tool queries the Cognee knowledge graph and returns a list of developer + rules. + + Parameters + ---------- + None + + Returns + ------- + list + A list containing a single TextContent object with the retrieved developer rules. + The format is plain text containing the developer rules in bulletpoints. + + Notes + ----- + - The specific logic for fetching rules is handled internally. + - This tool does not accept any parameters and is intended for simple rule inspection use cases. + """ + + async def fetch_rules_from_cognee() -> str: + """Collect all developer rules from Cognee""" + with redirect_stdout(sys.stderr): + if context.cognee_client.use_api: + logger.warning("Developer rules retrieval is not available in API mode") + return "Developer rules retrieval is not available in API mode" + + developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") + return developer_rules + + rules_text = await fetch_rules_from_cognee() + + return [types.TextContent(type="text", text=rules_text)] diff --git a/cognee-mcp/src/tools/list_data.py b/cognee-mcp/src/tools/list_data.py new file mode 100644 index 000000000..42b962cc5 --- /dev/null +++ b/cognee-mcp/src/tools/list_data.py @@ -0,0 +1,137 @@ +"""Tool for listing datasets and their data items.""" + +import sys +from uuid import UUID +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger + +from src.shared import context + +logger = get_logger() + + +async def list_data(dataset_id: str = None) -> list: + """ + List all datasets and their data items with IDs for deletion operations. + + This function helps users identify data IDs and dataset IDs that can be used + with the delete tool. It provides a comprehensive view of available data. + + Parameters + ---------- + dataset_id : str, optional + If provided, only list data items from this specific dataset. + If None, lists all datasets and their data items. + Should be a valid UUID string. + + Returns + ------- + list + A list containing a single TextContent object with formatted information + about datasets and data items, including their IDs for deletion. + + Notes + ----- + - Use this tool to identify data_id and dataset_id values for the delete tool + - The output includes both dataset information and individual data items + - UUIDs are displayed in a format ready for use with other tools + """ + + with redirect_stdout(sys.stderr): + try: + output_lines = [] + + if dataset_id: + # Detailed data listing for specific dataset is only available in direct mode + if context.cognee_client.use_api: + return [ + types.TextContent( + type="text", + text="❌ Detailed data listing for specific datasets is not available in API mode.\nPlease use the API directly or use direct mode.", + ) + ] + + from cognee.modules.users.methods import get_default_user + from cognee.modules.data.methods import get_dataset, get_dataset_data + + logger.info(f"Listing data for dataset: {dataset_id}") + dataset_uuid = UUID(dataset_id) + user = await get_default_user() + + dataset = await get_dataset(user.id, dataset_uuid) + + if not dataset: + return [ + types.TextContent(type="text", text=f"❌ Dataset not found: {dataset_id}") + ] + + # Get data items in the dataset + data_items = await get_dataset_data(dataset.id) + + output_lines.append(f"📁 Dataset: {dataset.name}") + output_lines.append(f" ID: {dataset.id}") + output_lines.append(f" Created: {dataset.created_at}") + output_lines.append(f" Data items: {len(data_items)}") + output_lines.append("") + + if data_items: + for i, data_item in enumerate(data_items, 1): + output_lines.append(f" 📄 Data item #{i}:") + output_lines.append(f" Data ID: {data_item.id}") + output_lines.append(f" Name: {data_item.name or 'Unnamed'}") + output_lines.append(f" Created: {data_item.created_at}") + output_lines.append("") + else: + output_lines.append(" (No data items in this dataset)") + + else: + # List all datasets - works in both modes + logger.info("Listing all datasets") + datasets = await context.cognee_client.list_datasets() + + if not datasets: + return [ + types.TextContent( + type="text", + text="📂 No datasets found.\nUse the cognify tool to create your first dataset!", + ) + ] + + output_lines.append("📂 Available Datasets:") + output_lines.append("=" * 50) + output_lines.append("") + + for i, dataset in enumerate(datasets, 1): + # In API mode, dataset is a dict; in direct mode, it's formatted as dict + if isinstance(dataset, dict): + output_lines.append(f"{i}. 📁 {dataset.get('name', 'Unnamed')}") + output_lines.append(f" Dataset ID: {dataset.get('id')}") + output_lines.append(f" Created: {dataset.get('created_at', 'N/A')}") + else: + output_lines.append(f"{i}. 📁 {dataset.name}") + output_lines.append(f" Dataset ID: {dataset.id}") + output_lines.append(f" Created: {dataset.created_at}") + output_lines.append("") + + if not context.cognee_client.use_api: + output_lines.append("💡 To see data items in a specific dataset, use:") + output_lines.append(' list_data(dataset_id="your-dataset-id-here")') + output_lines.append("") + output_lines.append("🗑️ To delete specific data, use:") + output_lines.append(' delete(data_id="data-id", dataset_id="dataset-id")') + + result_text = "\n".join(output_lines) + logger.info("List data operation completed successfully") + + return [types.TextContent(type="text", text=result_text)] + + except ValueError as e: + error_msg = f"❌ Invalid UUID format: {str(e)}" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + + except Exception as e: + error_msg = f"❌ Failed to list data: {str(e)}" + logger.error(f"List data error: {str(e)}") + return [types.TextContent(type="text", text=error_msg)] diff --git a/cognee-mcp/src/tools/prune.py b/cognee-mcp/src/tools/prune.py new file mode 100644 index 000000000..949441134 --- /dev/null +++ b/cognee-mcp/src/tools/prune.py @@ -0,0 +1,45 @@ +"""Tool for resetting the Cognee knowledge graph.""" + +import sys +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger + +from src.shared import context + +logger = get_logger() + + +async def prune(): + """ + Reset the Cognee knowledge graph by removing all stored information. + + This function performs a complete reset of both the data layer and system layer + of the Cognee knowledge graph, removing all nodes, edges, and associated metadata. + It is typically used during development or when needing to start fresh with a new + knowledge base. + + Returns + ------- + list + A list containing a single TextContent object with confirmation of the prune operation. + + Notes + ----- + - This operation cannot be undone. All memory data will be permanently deleted. + - The function prunes both data content (using prune_data) and system metadata (using prune_system) + - This operation is not available in API mode + """ + with redirect_stdout(sys.stderr): + try: + await context.cognee_client.prune_data() + await context.cognee_client.prune_system(metadata=True) + return [types.TextContent(type="text", text="Pruned")] + except NotImplementedError: + error_msg = "❌ Prune operation is not available in API mode" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] + except Exception as e: + error_msg = f"❌ Prune operation failed: {str(e)}" + logger.error(error_msg) + return [types.TextContent(type="text", text=error_msg)] diff --git a/cognee-mcp/src/tools/save_interaction.py b/cognee-mcp/src/tools/save_interaction.py new file mode 100644 index 000000000..c88e36393 --- /dev/null +++ b/cognee-mcp/src/tools/save_interaction.py @@ -0,0 +1,75 @@ +"""Tool for transforming and saving user-agent interactions into structured knowledge.""" + +import sys +import asyncio +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger, get_log_file_location + +from src.shared import context + +logger = get_logger() + +# Import coding agent rules functions +try: + from cognee.tasks.codingagents.coding_rule_associations import add_rule_associations +except ModuleNotFoundError: + from src.codingagents.coding_rule_associations import add_rule_associations + + +async def save_interaction(data: str) -> list: + """ + Transform and save a user-agent interaction into structured knowledge. + + Parameters + ---------- + data : str + The input string containing user queries and corresponding agent answers. + + Returns + ------- + list + A list containing a single TextContent object with information about the background task launch. + """ + + async def save_user_agent_interaction(data: str) -> None: + """Build knowledge graph from the interaction data""" + with redirect_stdout(sys.stderr): + logger.info("Save interaction process starting.") + + await context.cognee_client.add(data, node_set=["user_agent_interaction"]) + + try: + await context.cognee_client.cognify() + logger.info("Save interaction process finished.") + + # Rule associations only work in direct mode + if not context.cognee_client.use_api: + logger.info("Generating associated rules from interaction data.") + await add_rule_associations(data=data, rules_nodeset_name="coding_agent_rules") + logger.info("Associated rules generated from interaction data.") + else: + logger.warning("Rule associations are not available in API mode, skipping.") + + except Exception as e: + logger.error("Save interaction process failed.") + raise ValueError(f"Failed to Save interaction: {str(e)}") + + asyncio.create_task( + save_user_agent_interaction( + data=data, + ) + ) + + log_file = get_log_file_location() + text = ( + f"Background process launched to process the user-agent interaction.\n" + f"To check the current status, use the cognify_status tool or check the log file at: {log_file}" + ) + + return [ + types.TextContent( + type="text", + text=text, + ) + ] diff --git a/cognee-mcp/src/tools/search.py b/cognee-mcp/src/tools/search.py new file mode 100644 index 000000000..1db6b6e0e --- /dev/null +++ b/cognee-mcp/src/tools/search.py @@ -0,0 +1,166 @@ +"""Tool for searching and querying the knowledge graph.""" + +import sys +import json +from contextlib import redirect_stdout +import mcp.types as types +from cognee.shared.logging_utils import get_logger +from cognee.modules.storage.utils import JSONEncoder + +from src.shared import context +from .utils import retrieved_edges_to_string + +logger = get_logger() + + +async def search(search_query: str, search_type: str) -> list: + """ + Search and query the knowledge graph for insights, information, and connections. + + This is the final step in the Cognee workflow that retrieves information from the + processed knowledge graph. It supports multiple search modes optimized for different + use cases - from simple fact retrieval to complex reasoning and code analysis. + + Search Prerequisites: + - **LLM_API_KEY**: Required for GRAPH_COMPLETION and RAG_COMPLETION search types + - **Data Added**: Must have data previously added via `cognee.add()` + - **Knowledge Graph Built**: Must have processed data via `cognee.cognify()` + - **Vector Database**: Must be accessible for semantic search functionality + + Search Types & Use Cases: + + **GRAPH_COMPLETION** (Recommended): + Natural language Q&A using full graph context and LLM reasoning. + Best for: Complex questions, analysis, summaries, insights. + Returns: Conversational AI responses with graph-backed context. + + **RAG_COMPLETION**: + Traditional RAG using document chunks without graph structure. + Best for: Direct document retrieval, specific fact-finding. + Returns: LLM responses based on relevant text chunks. + + **CHUNKS**: + Raw text segments that match the query semantically. + Best for: Finding specific passages, citations, exact content. + Returns: Ranked list of relevant text chunks with metadata. + + **SUMMARIES**: + Pre-generated hierarchical summaries of content. + Best for: Quick overviews, document abstracts, topic summaries. + Returns: Multi-level summaries from detailed to high-level. + + **CODE**: + Code-specific search with syntax and semantic understanding. + Best for: Finding functions, classes, implementation patterns. + Returns: Structured code information with context and relationships. + + **CYPHER**: + Direct graph database queries using Cypher syntax. + Best for: Advanced users, specific graph traversals, debugging. + Returns: Raw graph query results. + + **FEELING_LUCKY**: + Intelligently selects and runs the most appropriate search type. + Best for: General-purpose queries or when you're unsure which search type is best. + Returns: The results from the automatically selected search type. + + Parameters + ---------- + search_query : str + Your question or search query in natural language. + Examples: + - "What are the main themes in this research?" + - "How do these concepts relate to each other?" + - "Find information about machine learning algorithms" + - "What functions handle user authentication?" + + search_type : str + The type of search to perform. Valid options include: + - "GRAPH_COMPLETION": Returns an LLM response based on the search query and Cognee's memory + - "RAG_COMPLETION": Returns an LLM response based on the search query and standard RAG data + - "CODE": Returns code-related knowledge in JSON format + - "CHUNKS": Returns raw text chunks from the knowledge graph + - "SUMMARIES": Returns pre-generated hierarchical summaries + - "CYPHER": Direct graph database queries + - "FEELING_LUCKY": Automatically selects best search type + + The search_type is case-insensitive and will be converted to uppercase. + + Returns + ------- + list + A list containing a single TextContent object with the search results. + The format of the result depends on the search_type: + - **GRAPH_COMPLETION/RAG_COMPLETION**: Conversational AI response strings + - **CHUNKS**: Relevant text passages with source metadata + - **SUMMARIES**: Hierarchical summaries from general to specific + - **CODE**: Structured code information with context + - **FEELING_LUCKY**: Results in format of automatically selected search type + - **CYPHER**: Raw graph query results + + Performance & Optimization: + - **GRAPH_COMPLETION**: Slower but most intelligent, uses LLM + graph context + - **RAG_COMPLETION**: Medium speed, uses LLM + document chunks (no graph traversal) + - **CHUNKS**: Fastest, pure vector similarity search without LLM + - **SUMMARIES**: Fast, returns pre-computed summaries + - **CODE**: Medium speed, specialized for code understanding + - **FEELING_LUCKY**: Variable speed, uses LLM + search type selection intelligently + + Environment Variables: + Required for LLM-based search types (GRAPH_COMPLETION, RAG_COMPLETION): + - LLM_API_KEY: API key for your LLM provider + + Optional: + - LLM_PROVIDER, LLM_MODEL: Configure LLM for search responses + - VECTOR_DB_PROVIDER: Must match what was used during cognify + - GRAPH_DATABASE_PROVIDER: Must match what was used during cognify + + Notes + ----- + - Different search types produce different output formats + - The function handles the conversion between Cognee's internal result format and MCP's output format + + """ + + async def search_task(search_query: str, search_type: str) -> str: + """Search the knowledge graph""" + # NOTE: MCP uses stdout to communicate, we must redirect all output + # going to stdout ( like the print function ) to stderr. + with redirect_stdout(sys.stderr): + search_results = await context.cognee_client.search( + query_text=search_query, query_type=search_type + ) + + # Handle different result formats based on API vs direct mode + if context.cognee_client.use_api: + # API mode returns JSON-serialized results + if isinstance(search_results, str): + return search_results + elif isinstance(search_results, list): + if ( + search_type.upper() in ["GRAPH_COMPLETION", "RAG_COMPLETION"] + and len(search_results) > 0 + ): + return str(search_results[0]) + return str(search_results) + else: + return json.dumps(search_results, cls=JSONEncoder) + else: + # Direct mode processing + if search_type.upper() == "CODE": + return json.dumps(search_results, cls=JSONEncoder) + elif ( + search_type.upper() == "GRAPH_COMPLETION" + or search_type.upper() == "RAG_COMPLETION" + ): + return str(search_results[0]) + elif search_type.upper() == "CHUNKS": + return str(search_results) + elif search_type.upper() == "INSIGHTS": + results = retrieved_edges_to_string(search_results) + return results + else: + return str(search_results) + + search_results = await search_task(search_query, search_type) + return [types.TextContent(type="text", text=search_results)] diff --git a/cognee-mcp/src/tools/utils.py b/cognee-mcp/src/tools/utils.py new file mode 100644 index 000000000..b36c05318 --- /dev/null +++ b/cognee-mcp/src/tools/utils.py @@ -0,0 +1,35 @@ +""" +Utility functions for cognee tools. +""" + +import os +import importlib.util + + +def node_to_string(node): + """Convert a node dictionary to a string representation.""" + node_data = ", ".join( + [f'{key}: "{value}"' for key, value in node.items() if key in ["id", "name"]] + ) + return f"Node({node_data})" + + +def retrieved_edges_to_string(search_results): + """Convert graph search results (triplets) to human-readable strings.""" + edge_strings = [] + for triplet in search_results: + node1, edge, node2 = triplet + relationship_type = edge["relationship_name"] + edge_str = f"{node_to_string(node1)} {relationship_type} {node_to_string(node2)}" + edge_strings.append(edge_str) + return "\n".join(edge_strings) + + +def load_class(model_file, model_name): + """Dynamically load a class from a file.""" + model_file = os.path.abspath(model_file) + spec = importlib.util.spec_from_file_location("graph_model", model_file) + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) + model_class = getattr(module, model_name) + return model_class