Compare commits
9 commits
main
...
refactor/r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f2b86b3508 | ||
|
|
ba9377f036 | ||
|
|
14164ac2ec | ||
|
|
27cfe8e323 | ||
|
|
661084aa57 | ||
|
|
9adb54a761 | ||
|
|
5e2b3c2e59 | ||
|
|
f467dc9b04 | ||
|
|
6996cdb887 |
17 changed files with 767 additions and 1136 deletions
|
|
@ -40,3 +40,11 @@ allow-direct-references = true
|
|||
[project.scripts]
|
||||
cognee = "src:main"
|
||||
cognee-mcp = "src:main_mcp"
|
||||
|
||||
[tool.pyright]
|
||||
typeCheckingMode = "basic"
|
||||
reportMissingImports = "error"
|
||||
reportUndefinedVariable = "error"
|
||||
reportMissingModuleSource = "error"
|
||||
reportUnusedImport = "warning"
|
||||
reportUnusedVariable = "warning"
|
||||
|
|
|
|||
|
|
@ -1,52 +0,0 @@
|
|||
from datetime import timedelta
|
||||
from mcp import ClientSession, StdioServerParameters
|
||||
from mcp.client.stdio import stdio_client
|
||||
|
||||
# Create server parameters for stdio connection
|
||||
server_params = StdioServerParameters(
|
||||
command="uv", # Executable
|
||||
args=["--directory", ".", "run", "cognee"], # Optional command line arguments
|
||||
env=None, # Optional environment variables
|
||||
)
|
||||
|
||||
text = """
|
||||
Artificial intelligence, or AI, is technology that enables computers
|
||||
and machines to simulate human intelligence and problem-solving
|
||||
capabilities.
|
||||
On its own or combined with other technologies (e.g., sensors,
|
||||
geolocation, robotics) AI can perform tasks that would otherwise
|
||||
require human intelligence or intervention. Digital assistants, GPS
|
||||
guidance, autonomous vehicles, and generative AI tools (like Open
|
||||
AI's Chat GPT) are just a few examples of AI in the daily news and
|
||||
our daily lives.
|
||||
As a field of computer science, artificial intelligence encompasses
|
||||
(and is often mentioned together with) machine learning and deep
|
||||
learning. These disciplines involve the development of AI
|
||||
algorithms, modeled after the decision-making processes of the human
|
||||
brain, that can ‘learn’ from available data and make increasingly
|
||||
more accurate classifications or predictions over time.
|
||||
"""
|
||||
|
||||
|
||||
async def run():
|
||||
async with stdio_client(server_params) as (read, write):
|
||||
async with ClientSession(read, write, timedelta(minutes=3)) as session:
|
||||
await session.initialize()
|
||||
|
||||
toolResult = await session.list_tools()
|
||||
|
||||
toolResult = await session.call_tool("prune", arguments={})
|
||||
|
||||
toolResult = await session.call_tool("cognify", arguments={})
|
||||
|
||||
toolResult = await session.call_tool(
|
||||
"search", arguments={"search_type": "GRAPH_COMPLETION"}
|
||||
)
|
||||
|
||||
print(f"Cognify result: {toolResult.content}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import asyncio
|
||||
|
||||
asyncio.run(run())
|
||||
3
cognee-mcp/src/clients/__init__.py
Normal file
3
cognee-mcp/src/clients/__init__.py
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
from .cognee_client import CogneeClient
|
||||
|
||||
__all__ = ["CogneeClient"]
|
||||
|
|
@ -17,6 +17,7 @@ import json
|
|||
logger = get_logger()
|
||||
|
||||
|
||||
# TODO(daulet) COG-3311: I'm exploring OpenAPI json client generation for backend
|
||||
class CogneeClient:
|
||||
"""
|
||||
Unified client for interacting with Cognee via direct calls or HTTP API.
|
||||
|
|
@ -1,120 +0,0 @@
|
|||
from uuid import NAMESPACE_OID, uuid5
|
||||
|
||||
from cognee.infrastructure.databases.graph import get_graph_engine
|
||||
from cognee.infrastructure.databases.vector import get_vector_engine
|
||||
|
||||
from cognee.low_level import DataPoint
|
||||
from cognee.infrastructure.llm import LLMGateway
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.modules.engine.models import NodeSet
|
||||
from cognee.tasks.storage import add_data_points, index_graph_edges
|
||||
from typing import Optional, List, Any
|
||||
from pydantic import Field
|
||||
|
||||
logger = get_logger("coding_rule_association")
|
||||
|
||||
|
||||
class Rule(DataPoint):
|
||||
"""A single developer rule extracted from text."""
|
||||
|
||||
text: str = Field(..., description="The coding rule associated with the conversation")
|
||||
belongs_to_set: Optional[NodeSet] = None
|
||||
metadata: dict = {"index_fields": ["rule"]}
|
||||
|
||||
|
||||
class RuleSet(DataPoint):
|
||||
"""Collection of parsed rules."""
|
||||
|
||||
rules: List[Rule] = Field(
|
||||
...,
|
||||
description="List of developer rules extracted from the input text. Each rule represents a coding best practice or guideline.",
|
||||
)
|
||||
|
||||
|
||||
async def get_existing_rules(rules_nodeset_name: str) -> str:
|
||||
graph_engine = await get_graph_engine()
|
||||
nodes_data, _ = await graph_engine.get_nodeset_subgraph(
|
||||
node_type=NodeSet, node_name=[rules_nodeset_name]
|
||||
)
|
||||
|
||||
existing_rules = [
|
||||
item[1]["text"]
|
||||
for item in nodes_data
|
||||
if isinstance(item, tuple)
|
||||
and len(item) == 2
|
||||
and isinstance(item[1], dict)
|
||||
and "text" in item[1]
|
||||
]
|
||||
|
||||
existing_rules = "\n".join(f"- {rule}" for rule in existing_rules)
|
||||
|
||||
return existing_rules
|
||||
|
||||
|
||||
async def get_origin_edges(data: str, rules: List[Rule]) -> list[Any]:
|
||||
vector_engine = get_vector_engine()
|
||||
|
||||
origin_chunk = await vector_engine.search("DocumentChunk_text", data, limit=1)
|
||||
|
||||
try:
|
||||
origin_id = origin_chunk[0].id
|
||||
except (AttributeError, KeyError, TypeError, IndexError):
|
||||
origin_id = None
|
||||
|
||||
relationships = []
|
||||
|
||||
if origin_id and isinstance(rules, (list, tuple)) and len(rules) > 0:
|
||||
for rule in rules:
|
||||
try:
|
||||
rule_id = getattr(rule, "id", None)
|
||||
if rule_id is not None:
|
||||
rel_name = "rule_associated_from"
|
||||
relationships.append(
|
||||
(
|
||||
rule_id,
|
||||
origin_id,
|
||||
rel_name,
|
||||
{
|
||||
"relationship_name": rel_name,
|
||||
"source_node_id": rule_id,
|
||||
"target_node_id": origin_id,
|
||||
"ontology_valid": False,
|
||||
},
|
||||
)
|
||||
)
|
||||
except Exception as e:
|
||||
logger.info(f"Warning: Skipping invalid rule due to error: {e}")
|
||||
else:
|
||||
logger.info("No valid origin_id or rules provided.")
|
||||
|
||||
return relationships
|
||||
|
||||
|
||||
async def add_rule_associations(data: str, rules_nodeset_name: str):
|
||||
graph_engine = await get_graph_engine()
|
||||
existing_rules = await get_existing_rules(rules_nodeset_name=rules_nodeset_name)
|
||||
|
||||
user_context = {"chat": data, "rules": existing_rules}
|
||||
|
||||
user_prompt = LLMGateway.render_prompt(
|
||||
"coding_rule_association_agent_user.txt", context=user_context
|
||||
)
|
||||
system_prompt = LLMGateway.render_prompt("coding_rule_association_agent_system.txt", context={})
|
||||
|
||||
rule_list = await LLMGateway.acreate_structured_output(
|
||||
text_input=user_prompt, system_prompt=system_prompt, response_model=RuleSet
|
||||
)
|
||||
|
||||
rules_nodeset = NodeSet(
|
||||
id=uuid5(NAMESPACE_OID, name=rules_nodeset_name), name=rules_nodeset_name
|
||||
)
|
||||
for rule in rule_list.rules:
|
||||
rule.belongs_to_set = rules_nodeset
|
||||
|
||||
edges_to_save = await get_origin_edges(data=data, rules=rule_list.rules)
|
||||
|
||||
await add_data_points(data_points=rule_list.rules)
|
||||
|
||||
if len(edges_to_save) > 0:
|
||||
await graph_engine.add_edges(edges_to_save)
|
||||
await index_graph_edges(edges_to_save)
|
||||
File diff suppressed because it is too large
Load diff
3
cognee-mcp/src/shared/__init__.py
Normal file
3
cognee-mcp/src/shared/__init__.py
Normal file
|
|
@ -0,0 +1,3 @@
|
|||
from . import context
|
||||
|
||||
__all__ = ["context"]
|
||||
11
cognee-mcp/src/shared/context.py
Normal file
11
cognee-mcp/src/shared/context.py
Normal file
|
|
@ -0,0 +1,11 @@
|
|||
from typing import Optional
|
||||
|
||||
from src.clients.cognee_client import CogneeClient
|
||||
|
||||
cognee_client: Optional["CogneeClient"] = None
|
||||
|
||||
|
||||
def set_cognee_client(client: "CogneeClient") -> None:
|
||||
"""Set the global cognee client instance."""
|
||||
global cognee_client
|
||||
cognee_client = client
|
||||
17
cognee-mcp/src/tools/__init__.py
Normal file
17
cognee-mcp/src/tools/__init__.py
Normal file
|
|
@ -0,0 +1,17 @@
|
|||
"""Cognee MCP Tools - All tools for interacting with the Cognee knowledge graph."""
|
||||
|
||||
from .cognify import cognify
|
||||
from .search import search
|
||||
from .list_data import list_data
|
||||
from .delete import delete
|
||||
from .prune import prune
|
||||
from .cognify_status import cognify_status
|
||||
|
||||
__all__ = [
|
||||
"cognify",
|
||||
"search",
|
||||
"list_data",
|
||||
"delete",
|
||||
"prune",
|
||||
"cognify_status",
|
||||
]
|
||||
178
cognee-mcp/src/tools/cognify.py
Normal file
178
cognee-mcp/src/tools/cognify.py
Normal file
|
|
@ -0,0 +1,178 @@
|
|||
"""Tool for transforming data into a structured knowledge graph."""
|
||||
|
||||
import sys
|
||||
import asyncio
|
||||
from contextlib import redirect_stdout
|
||||
import mcp.types as types
|
||||
from cognee.shared.logging_utils import get_logger, get_log_file_location
|
||||
|
||||
from src.shared import context
|
||||
from .utils import load_class
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
async def cognify(
|
||||
data: str, graph_model_file: str = None, graph_model_name: str = None, custom_prompt: str = None
|
||||
) -> list:
|
||||
"""
|
||||
Transform ingested data into a structured knowledge graph.
|
||||
|
||||
This is the core processing step in Cognee that converts raw text and documents
|
||||
into an intelligent knowledge graph. It analyzes content, extracts entities and
|
||||
relationships, and creates semantic connections for enhanced search and reasoning.
|
||||
|
||||
Prerequisites:
|
||||
- **LLM_API_KEY**: Must be configured (required for entity extraction and graph generation)
|
||||
- **Data Added**: Must have data previously added via `cognee.add()`
|
||||
- **Vector Database**: Must be accessible for embeddings storage
|
||||
- **Graph Database**: Must be accessible for relationship storage
|
||||
|
||||
Input Requirements:
|
||||
- **Content Types**: Works with any text-extractable content including:
|
||||
* Natural language documents
|
||||
* Structured data (CSV, JSON)
|
||||
* Code repositories
|
||||
* Academic papers and technical documentation
|
||||
* Mixed multimedia content (with text extraction)
|
||||
|
||||
Processing Pipeline:
|
||||
1. **Document Classification**: Identifies document types and structures
|
||||
2. **Permission Validation**: Ensures user has processing rights
|
||||
3. **Text Chunking**: Breaks content into semantically meaningful segments
|
||||
4. **Entity Extraction**: Identifies key concepts, people, places, organizations
|
||||
5. **Relationship Detection**: Discovers connections between entities
|
||||
6. **Graph Construction**: Builds semantic knowledge graph with embeddings
|
||||
7. **Content Summarization**: Creates hierarchical summaries for navigation
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data : str
|
||||
The data to be processed and transformed into structured knowledge.
|
||||
This can include natural language, file location, or any text-based information
|
||||
that should become part of the agent's memory.
|
||||
|
||||
graph_model_file : str, optional
|
||||
Path to a custom schema file that defines the structure of the generated knowledge graph.
|
||||
If provided, this file will be loaded using importlib to create a custom graph model.
|
||||
Default is None, which uses Cognee's built-in KnowledgeGraph model.
|
||||
|
||||
graph_model_name : str, optional
|
||||
Name of the class within the graph_model_file to instantiate as the graph model.
|
||||
Required if graph_model_file is specified.
|
||||
Default is None, which uses the default KnowledgeGraph class.
|
||||
|
||||
custom_prompt : str, optional
|
||||
Custom prompt string to use for entity extraction and graph generation.
|
||||
If provided, this prompt will be used instead of the default prompts for
|
||||
knowledge graph extraction. The prompt should guide the LLM on how to
|
||||
extract entities and relationships from the text content.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
A list containing a single TextContent object with information about the
|
||||
background task launch and how to check its status.
|
||||
|
||||
Next Steps:
|
||||
After successful cognify processing, use search functions to query the knowledge:
|
||||
|
||||
```python
|
||||
import cognee
|
||||
from cognee import SearchType
|
||||
|
||||
# Process your data into knowledge graph
|
||||
await cognee.cognify()
|
||||
|
||||
# Query for insights using different search types:
|
||||
|
||||
# 1. Natural language completion with graph context
|
||||
insights = await cognee.search(
|
||||
"What are the main themes?",
|
||||
query_type=SearchType.GRAPH_COMPLETION
|
||||
)
|
||||
|
||||
# 2. Get entity relationships and connections
|
||||
relationships = await cognee.search(
|
||||
"connections between concepts",
|
||||
query_type=SearchType.GRAPH_COMPLETION
|
||||
)
|
||||
|
||||
# 3. Find relevant document chunks
|
||||
chunks = await cognee.search(
|
||||
"specific topic",
|
||||
query_type=SearchType.CHUNKS
|
||||
)
|
||||
```
|
||||
|
||||
Environment Variables:
|
||||
Required:
|
||||
- LLM_API_KEY: API key for your LLM provider
|
||||
|
||||
Optional:
|
||||
- LLM_PROVIDER, LLM_MODEL, VECTOR_DB_PROVIDER, GRAPH_DATABASE_PROVIDER
|
||||
- LLM_RATE_LIMIT_ENABLED: Enable rate limiting (default: False)
|
||||
- LLM_RATE_LIMIT_REQUESTS: Max requests per interval (default: 60)
|
||||
|
||||
Notes
|
||||
-----
|
||||
- The function launches a background task and returns immediately
|
||||
- The actual cognify process may take significant time depending on text length
|
||||
- Use the cognify_status tool to check the progress of the operation
|
||||
|
||||
"""
|
||||
|
||||
async def cognify_task(
|
||||
data: str,
|
||||
graph_model_file: str = None,
|
||||
graph_model_name: str = None,
|
||||
custom_prompt: str = None,
|
||||
) -> str:
|
||||
"""Build knowledge graph from the input text"""
|
||||
# NOTE: MCP uses stdout to communicate, we must redirect all output
|
||||
# going to stdout ( like the print function ) to stderr.
|
||||
with redirect_stdout(sys.stderr):
|
||||
logger.info("Cognify process starting.")
|
||||
|
||||
graph_model = None
|
||||
if graph_model_file and graph_model_name:
|
||||
if context.cognee_client.use_api:
|
||||
logger.warning("Custom graph models are not supported in API mode, ignoring.")
|
||||
else:
|
||||
from cognee.shared.data_models import KnowledgeGraph
|
||||
|
||||
graph_model = load_class(graph_model_file, graph_model_name)
|
||||
|
||||
await context.cognee_client.add(data)
|
||||
|
||||
try:
|
||||
await context.cognee_client.cognify(
|
||||
custom_prompt=custom_prompt, graph_model=graph_model
|
||||
)
|
||||
logger.info("Cognify process finished.")
|
||||
except Exception as e:
|
||||
logger.error("Cognify process failed.")
|
||||
raise ValueError(f"Failed to cognify: {str(e)}")
|
||||
|
||||
asyncio.create_task(
|
||||
cognify_task(
|
||||
data=data,
|
||||
graph_model_file=graph_model_file,
|
||||
graph_model_name=graph_model_name,
|
||||
custom_prompt=custom_prompt,
|
||||
)
|
||||
)
|
||||
|
||||
log_file = get_log_file_location()
|
||||
text = (
|
||||
f"Background process launched due to MCP timeout limitations.\n"
|
||||
f"To check current cognify status use the cognify_status tool\n"
|
||||
f"or check the log file at: {log_file}"
|
||||
)
|
||||
|
||||
return [
|
||||
types.TextContent(
|
||||
type="text",
|
||||
text=text,
|
||||
)
|
||||
]
|
||||
51
cognee-mcp/src/tools/cognify_status.py
Normal file
51
cognee-mcp/src/tools/cognify_status.py
Normal file
|
|
@ -0,0 +1,51 @@
|
|||
"""Tool for getting the status of the cognify pipeline."""
|
||||
|
||||
import sys
|
||||
from contextlib import redirect_stdout
|
||||
import mcp.types as types
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
|
||||
from src.shared import context
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
async def cognify_status():
|
||||
"""
|
||||
Get the current status of the cognify pipeline.
|
||||
|
||||
This function retrieves information about current and recently completed cognify operations
|
||||
in the main_dataset. It provides details on progress, success/failure status, and statistics
|
||||
about the processed data.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
A list containing a single TextContent object with the status information as a string.
|
||||
The status includes information about active and completed jobs for the cognify_pipeline.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- The function retrieves pipeline status specifically for the "cognify_pipeline" on the "main_dataset"
|
||||
- Status information includes job progress, execution time, and completion status
|
||||
- The status is returned in string format for easy reading
|
||||
- This operation is not available in API mode
|
||||
"""
|
||||
with redirect_stdout(sys.stderr):
|
||||
try:
|
||||
from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
|
||||
user = await get_default_user()
|
||||
status = await context.cognee_client.get_pipeline_status(
|
||||
[await get_unique_dataset_id("main_dataset", user)], "cognify_pipeline"
|
||||
)
|
||||
return [types.TextContent(type="text", text=str(status))]
|
||||
except NotImplementedError:
|
||||
error_msg = "❌ Pipeline status is not available in API mode"
|
||||
logger.error(error_msg)
|
||||
return [types.TextContent(type="text", text=error_msg)]
|
||||
except Exception as e:
|
||||
error_msg = f"❌ Failed to get cognify status: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return [types.TextContent(type="text", text=error_msg)]
|
||||
90
cognee-mcp/src/tools/delete.py
Normal file
90
cognee-mcp/src/tools/delete.py
Normal file
|
|
@ -0,0 +1,90 @@
|
|||
"""Tool for deleting specific data from a dataset."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
from uuid import UUID
|
||||
from contextlib import redirect_stdout
|
||||
import mcp.types as types
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.modules.storage.utils import JSONEncoder
|
||||
|
||||
from src.shared import context
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
async def delete(data_id: str, dataset_id: str, mode: str = "soft") -> list:
|
||||
"""
|
||||
Delete specific data from a dataset in the Cognee knowledge graph.
|
||||
|
||||
This function removes a specific data item from a dataset while keeping the
|
||||
dataset itself intact. It supports both soft and hard deletion modes.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
data_id : str
|
||||
The UUID of the data item to delete from the knowledge graph.
|
||||
This should be a valid UUID string identifying the specific data item.
|
||||
|
||||
dataset_id : str
|
||||
The UUID of the dataset containing the data to be deleted.
|
||||
This should be a valid UUID string identifying the dataset.
|
||||
|
||||
mode : str, optional
|
||||
The deletion mode to use. Options are:
|
||||
- "soft" (default): Removes the data but keeps related entities that might be shared
|
||||
- "hard": Also removes degree-one entity nodes that become orphaned after deletion
|
||||
Default is "soft" for safer deletion that preserves shared knowledge.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
A list containing a single TextContent object with the deletion results,
|
||||
including status, deleted node counts, and confirmation details.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- This operation cannot be undone. The specified data will be permanently removed.
|
||||
- Hard mode may remove additional entity nodes that become orphaned
|
||||
- The function provides detailed feedback about what was deleted
|
||||
- Use this for targeted deletion instead of the prune tool which removes everything
|
||||
"""
|
||||
|
||||
with redirect_stdout(sys.stderr):
|
||||
try:
|
||||
logger.info(
|
||||
f"Starting delete operation for data_id: {data_id}, dataset_id: {dataset_id}, mode: {mode}"
|
||||
)
|
||||
|
||||
# Convert string UUIDs to UUID objects
|
||||
data_uuid = UUID(data_id)
|
||||
dataset_uuid = UUID(dataset_id)
|
||||
|
||||
# Call the cognee delete function via client
|
||||
result = await context.cognee_client.delete(
|
||||
data_id=data_uuid, dataset_id=dataset_uuid, mode=mode
|
||||
)
|
||||
|
||||
logger.info(f"Delete operation completed successfully: {result}")
|
||||
|
||||
# Format the result for MCP response
|
||||
formatted_result = json.dumps(result, indent=2, cls=JSONEncoder)
|
||||
|
||||
return [
|
||||
types.TextContent(
|
||||
type="text",
|
||||
text=f"✅ Delete operation completed successfully!\n\n{formatted_result}",
|
||||
)
|
||||
]
|
||||
|
||||
except ValueError as e:
|
||||
# Handle UUID parsing errors
|
||||
error_msg = f"❌ Invalid UUID format: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return [types.TextContent(type="text", text=error_msg)]
|
||||
|
||||
except Exception as e:
|
||||
# Handle all other errors (DocumentNotFoundError, DatasetNotFoundError, etc.)
|
||||
error_msg = f"❌ Delete operation failed: {str(e)}"
|
||||
logger.error(f"Delete operation error: {str(e)}")
|
||||
return [types.TextContent(type="text", text=error_msg)]
|
||||
137
cognee-mcp/src/tools/list_data.py
Normal file
137
cognee-mcp/src/tools/list_data.py
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
"""Tool for listing datasets and their data items."""
|
||||
|
||||
import sys
|
||||
from uuid import UUID
|
||||
from contextlib import redirect_stdout
|
||||
import mcp.types as types
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
|
||||
from src.shared import context
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
async def list_data(dataset_id: str = None) -> list:
|
||||
"""
|
||||
List all datasets and their data items with IDs for deletion operations.
|
||||
|
||||
This function helps users identify data IDs and dataset IDs that can be used
|
||||
with the delete tool. It provides a comprehensive view of available data.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
dataset_id : str, optional
|
||||
If provided, only list data items from this specific dataset.
|
||||
If None, lists all datasets and their data items.
|
||||
Should be a valid UUID string.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
A list containing a single TextContent object with formatted information
|
||||
about datasets and data items, including their IDs for deletion.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Use this tool to identify data_id and dataset_id values for the delete tool
|
||||
- The output includes both dataset information and individual data items
|
||||
- UUIDs are displayed in a format ready for use with other tools
|
||||
"""
|
||||
|
||||
with redirect_stdout(sys.stderr):
|
||||
try:
|
||||
output_lines = []
|
||||
|
||||
if dataset_id:
|
||||
# Detailed data listing for specific dataset is only available in direct mode
|
||||
if context.cognee_client.use_api:
|
||||
return [
|
||||
types.TextContent(
|
||||
type="text",
|
||||
text="❌ Detailed data listing for specific datasets is not available in API mode.\nPlease use the API directly or use direct mode.",
|
||||
)
|
||||
]
|
||||
|
||||
from cognee.modules.users.methods import get_default_user
|
||||
from cognee.modules.data.methods import get_dataset, get_dataset_data
|
||||
|
||||
logger.info(f"Listing data for dataset: {dataset_id}")
|
||||
dataset_uuid = UUID(dataset_id)
|
||||
user = await get_default_user()
|
||||
|
||||
dataset = await get_dataset(user.id, dataset_uuid)
|
||||
|
||||
if not dataset:
|
||||
return [
|
||||
types.TextContent(type="text", text=f"❌ Dataset not found: {dataset_id}")
|
||||
]
|
||||
|
||||
# Get data items in the dataset
|
||||
data_items = await get_dataset_data(dataset.id)
|
||||
|
||||
output_lines.append(f"📁 Dataset: {dataset.name}")
|
||||
output_lines.append(f" ID: {dataset.id}")
|
||||
output_lines.append(f" Created: {dataset.created_at}")
|
||||
output_lines.append(f" Data items: {len(data_items)}")
|
||||
output_lines.append("")
|
||||
|
||||
if data_items:
|
||||
for i, data_item in enumerate(data_items, 1):
|
||||
output_lines.append(f" 📄 Data item #{i}:")
|
||||
output_lines.append(f" Data ID: {data_item.id}")
|
||||
output_lines.append(f" Name: {data_item.name or 'Unnamed'}")
|
||||
output_lines.append(f" Created: {data_item.created_at}")
|
||||
output_lines.append("")
|
||||
else:
|
||||
output_lines.append(" (No data items in this dataset)")
|
||||
|
||||
else:
|
||||
# List all datasets - works in both modes
|
||||
logger.info("Listing all datasets")
|
||||
datasets = await context.cognee_client.list_datasets()
|
||||
|
||||
if not datasets:
|
||||
return [
|
||||
types.TextContent(
|
||||
type="text",
|
||||
text="📂 No datasets found.\nUse the cognify tool to create your first dataset!",
|
||||
)
|
||||
]
|
||||
|
||||
output_lines.append("📂 Available Datasets:")
|
||||
output_lines.append("=" * 50)
|
||||
output_lines.append("")
|
||||
|
||||
for i, dataset in enumerate(datasets, 1):
|
||||
# In API mode, dataset is a dict; in direct mode, it's formatted as dict
|
||||
if isinstance(dataset, dict):
|
||||
output_lines.append(f"{i}. 📁 {dataset.get('name', 'Unnamed')}")
|
||||
output_lines.append(f" Dataset ID: {dataset.get('id')}")
|
||||
output_lines.append(f" Created: {dataset.get('created_at', 'N/A')}")
|
||||
else:
|
||||
output_lines.append(f"{i}. 📁 {dataset.name}")
|
||||
output_lines.append(f" Dataset ID: {dataset.id}")
|
||||
output_lines.append(f" Created: {dataset.created_at}")
|
||||
output_lines.append("")
|
||||
|
||||
if not context.cognee_client.use_api:
|
||||
output_lines.append("💡 To see data items in a specific dataset, use:")
|
||||
output_lines.append(' list_data(dataset_id="your-dataset-id-here")')
|
||||
output_lines.append("")
|
||||
output_lines.append("🗑️ To delete specific data, use:")
|
||||
output_lines.append(' delete(data_id="data-id", dataset_id="dataset-id")')
|
||||
|
||||
result_text = "\n".join(output_lines)
|
||||
logger.info("List data operation completed successfully")
|
||||
|
||||
return [types.TextContent(type="text", text=result_text)]
|
||||
|
||||
except ValueError as e:
|
||||
error_msg = f"❌ Invalid UUID format: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return [types.TextContent(type="text", text=error_msg)]
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"❌ Failed to list data: {str(e)}"
|
||||
logger.error(f"List data error: {str(e)}")
|
||||
return [types.TextContent(type="text", text=error_msg)]
|
||||
45
cognee-mcp/src/tools/prune.py
Normal file
45
cognee-mcp/src/tools/prune.py
Normal file
|
|
@ -0,0 +1,45 @@
|
|||
"""Tool for resetting the Cognee knowledge graph."""
|
||||
|
||||
import sys
|
||||
from contextlib import redirect_stdout
|
||||
import mcp.types as types
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
|
||||
from src.shared import context
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
async def prune():
|
||||
"""
|
||||
Reset the Cognee knowledge graph by removing all stored information.
|
||||
|
||||
This function performs a complete reset of both the data layer and system layer
|
||||
of the Cognee knowledge graph, removing all nodes, edges, and associated metadata.
|
||||
It is typically used during development or when needing to start fresh with a new
|
||||
knowledge base.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
A list containing a single TextContent object with confirmation of the prune operation.
|
||||
|
||||
Notes
|
||||
-----
|
||||
- This operation cannot be undone. All memory data will be permanently deleted.
|
||||
- The function prunes both data content (using prune_data) and system metadata (using prune_system)
|
||||
- This operation is not available in API mode
|
||||
"""
|
||||
with redirect_stdout(sys.stderr):
|
||||
try:
|
||||
await context.cognee_client.prune_data()
|
||||
await context.cognee_client.prune_system(metadata=True)
|
||||
return [types.TextContent(type="text", text="Pruned")]
|
||||
except NotImplementedError:
|
||||
error_msg = "❌ Prune operation is not available in API mode"
|
||||
logger.error(error_msg)
|
||||
return [types.TextContent(type="text", text=error_msg)]
|
||||
except Exception as e:
|
||||
error_msg = f"❌ Prune operation failed: {str(e)}"
|
||||
logger.error(error_msg)
|
||||
return [types.TextContent(type="text", text=error_msg)]
|
||||
166
cognee-mcp/src/tools/search.py
Normal file
166
cognee-mcp/src/tools/search.py
Normal file
|
|
@ -0,0 +1,166 @@
|
|||
"""Tool for searching and querying the knowledge graph."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
from contextlib import redirect_stdout
|
||||
import mcp.types as types
|
||||
from cognee.shared.logging_utils import get_logger
|
||||
from cognee.modules.storage.utils import JSONEncoder
|
||||
|
||||
from src.shared import context
|
||||
from .utils import retrieved_edges_to_string
|
||||
|
||||
logger = get_logger()
|
||||
|
||||
|
||||
async def search(search_query: str, search_type: str) -> list:
|
||||
"""
|
||||
Search and query the knowledge graph for insights, information, and connections.
|
||||
|
||||
This is the final step in the Cognee workflow that retrieves information from the
|
||||
processed knowledge graph. It supports multiple search modes optimized for different
|
||||
use cases - from simple fact retrieval to complex reasoning and code analysis.
|
||||
|
||||
Search Prerequisites:
|
||||
- **LLM_API_KEY**: Required for GRAPH_COMPLETION and RAG_COMPLETION search types
|
||||
- **Data Added**: Must have data previously added via `cognee.add()`
|
||||
- **Knowledge Graph Built**: Must have processed data via `cognee.cognify()`
|
||||
- **Vector Database**: Must be accessible for semantic search functionality
|
||||
|
||||
Search Types & Use Cases:
|
||||
|
||||
**GRAPH_COMPLETION** (Recommended):
|
||||
Natural language Q&A using full graph context and LLM reasoning.
|
||||
Best for: Complex questions, analysis, summaries, insights.
|
||||
Returns: Conversational AI responses with graph-backed context.
|
||||
|
||||
**RAG_COMPLETION**:
|
||||
Traditional RAG using document chunks without graph structure.
|
||||
Best for: Direct document retrieval, specific fact-finding.
|
||||
Returns: LLM responses based on relevant text chunks.
|
||||
|
||||
**CHUNKS**:
|
||||
Raw text segments that match the query semantically.
|
||||
Best for: Finding specific passages, citations, exact content.
|
||||
Returns: Ranked list of relevant text chunks with metadata.
|
||||
|
||||
**SUMMARIES**:
|
||||
Pre-generated hierarchical summaries of content.
|
||||
Best for: Quick overviews, document abstracts, topic summaries.
|
||||
Returns: Multi-level summaries from detailed to high-level.
|
||||
|
||||
**CODE**:
|
||||
Code-specific search with syntax and semantic understanding.
|
||||
Best for: Finding functions, classes, implementation patterns.
|
||||
Returns: Structured code information with context and relationships.
|
||||
|
||||
**CYPHER**:
|
||||
Direct graph database queries using Cypher syntax.
|
||||
Best for: Advanced users, specific graph traversals, debugging.
|
||||
Returns: Raw graph query results.
|
||||
|
||||
**FEELING_LUCKY**:
|
||||
Intelligently selects and runs the most appropriate search type.
|
||||
Best for: General-purpose queries or when you're unsure which search type is best.
|
||||
Returns: The results from the automatically selected search type.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
search_query : str
|
||||
Your question or search query in natural language.
|
||||
Examples:
|
||||
- "What are the main themes in this research?"
|
||||
- "How do these concepts relate to each other?"
|
||||
- "Find information about machine learning algorithms"
|
||||
- "What functions handle user authentication?"
|
||||
|
||||
search_type : str
|
||||
The type of search to perform. Valid options include:
|
||||
- "GRAPH_COMPLETION": Returns an LLM response based on the search query and Cognee's memory
|
||||
- "RAG_COMPLETION": Returns an LLM response based on the search query and standard RAG data
|
||||
- "CODE": Returns code-related knowledge in JSON format
|
||||
- "CHUNKS": Returns raw text chunks from the knowledge graph
|
||||
- "SUMMARIES": Returns pre-generated hierarchical summaries
|
||||
- "CYPHER": Direct graph database queries
|
||||
- "FEELING_LUCKY": Automatically selects best search type
|
||||
|
||||
The search_type is case-insensitive and will be converted to uppercase.
|
||||
|
||||
Returns
|
||||
-------
|
||||
list
|
||||
A list containing a single TextContent object with the search results.
|
||||
The format of the result depends on the search_type:
|
||||
- **GRAPH_COMPLETION/RAG_COMPLETION**: Conversational AI response strings
|
||||
- **CHUNKS**: Relevant text passages with source metadata
|
||||
- **SUMMARIES**: Hierarchical summaries from general to specific
|
||||
- **CODE**: Structured code information with context
|
||||
- **FEELING_LUCKY**: Results in format of automatically selected search type
|
||||
- **CYPHER**: Raw graph query results
|
||||
|
||||
Performance & Optimization:
|
||||
- **GRAPH_COMPLETION**: Slower but most intelligent, uses LLM + graph context
|
||||
- **RAG_COMPLETION**: Medium speed, uses LLM + document chunks (no graph traversal)
|
||||
- **CHUNKS**: Fastest, pure vector similarity search without LLM
|
||||
- **SUMMARIES**: Fast, returns pre-computed summaries
|
||||
- **CODE**: Medium speed, specialized for code understanding
|
||||
- **FEELING_LUCKY**: Variable speed, uses LLM + search type selection intelligently
|
||||
|
||||
Environment Variables:
|
||||
Required for LLM-based search types (GRAPH_COMPLETION, RAG_COMPLETION):
|
||||
- LLM_API_KEY: API key for your LLM provider
|
||||
|
||||
Optional:
|
||||
- LLM_PROVIDER, LLM_MODEL: Configure LLM for search responses
|
||||
- VECTOR_DB_PROVIDER: Must match what was used during cognify
|
||||
- GRAPH_DATABASE_PROVIDER: Must match what was used during cognify
|
||||
|
||||
Notes
|
||||
-----
|
||||
- Different search types produce different output formats
|
||||
- The function handles the conversion between Cognee's internal result format and MCP's output format
|
||||
|
||||
"""
|
||||
|
||||
async def search_task(search_query: str, search_type: str) -> str:
|
||||
"""Search the knowledge graph"""
|
||||
# NOTE: MCP uses stdout to communicate, we must redirect all output
|
||||
# going to stdout ( like the print function ) to stderr.
|
||||
with redirect_stdout(sys.stderr):
|
||||
search_results = await context.cognee_client.search(
|
||||
query_text=search_query, query_type=search_type
|
||||
)
|
||||
|
||||
# Handle different result formats based on API vs direct mode
|
||||
if context.cognee_client.use_api:
|
||||
# API mode returns JSON-serialized results
|
||||
if isinstance(search_results, str):
|
||||
return search_results
|
||||
elif isinstance(search_results, list):
|
||||
if (
|
||||
search_type.upper() in ["GRAPH_COMPLETION", "RAG_COMPLETION"]
|
||||
and len(search_results) > 0
|
||||
):
|
||||
return str(search_results[0])
|
||||
return str(search_results)
|
||||
else:
|
||||
return json.dumps(search_results, cls=JSONEncoder)
|
||||
else:
|
||||
# Direct mode processing
|
||||
if search_type.upper() == "CODE":
|
||||
return json.dumps(search_results, cls=JSONEncoder)
|
||||
elif (
|
||||
search_type.upper() == "GRAPH_COMPLETION"
|
||||
or search_type.upper() == "RAG_COMPLETION"
|
||||
):
|
||||
return str(search_results[0])
|
||||
elif search_type.upper() == "CHUNKS":
|
||||
return str(search_results)
|
||||
elif search_type.upper() == "INSIGHTS":
|
||||
results = retrieved_edges_to_string(search_results)
|
||||
return results
|
||||
else:
|
||||
return str(search_results)
|
||||
|
||||
search_results = await search_task(search_query, search_type)
|
||||
return [types.TextContent(type="text", text=search_results)]
|
||||
39
cognee-mcp/src/tools/utils.py
Normal file
39
cognee-mcp/src/tools/utils.py
Normal file
|
|
@ -0,0 +1,39 @@
|
|||
"""
|
||||
Utility functions for cognee tools.
|
||||
"""
|
||||
|
||||
import os
|
||||
import importlib.util
|
||||
|
||||
|
||||
def node_to_string(node):
|
||||
"""Convert a node dictionary to a string representation."""
|
||||
node_data = ", ".join(
|
||||
[f'{key}: "{value}"' for key, value in node.items() if key in ["id", "name"]]
|
||||
)
|
||||
return f"Node({node_data})"
|
||||
|
||||
|
||||
def retrieved_edges_to_string(search_results):
|
||||
"""Convert graph search results (triplets) to human-readable strings."""
|
||||
edge_strings = []
|
||||
for triplet in search_results:
|
||||
node1, edge, node2 = triplet
|
||||
relationship_type = edge["relationship_name"]
|
||||
edge_str = f"{node_to_string(node1)} {relationship_type} {node_to_string(node2)}"
|
||||
edge_strings.append(edge_str)
|
||||
return "\n".join(edge_strings)
|
||||
|
||||
|
||||
def load_class(model_file, model_name):
|
||||
"""Dynamically load a class from a file."""
|
||||
model_file = os.path.abspath(model_file)
|
||||
spec = importlib.util.spec_from_file_location("graph_model", model_file)
|
||||
if spec is None:
|
||||
raise ValueError(f"Could not load specification for module from file: {model_file}")
|
||||
if spec.loader is None:
|
||||
raise ImportError(f"Spec loader is None for module file: {model_file}")
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module)
|
||||
model_class = getattr(module, model_name)
|
||||
return model_class
|
||||
Loading…
Add table
Reference in a new issue