Compare commits
2 commits
main
...
tests/cogn
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e4b8b0e9b2 | ||
|
|
f9059d7e0c |
11 changed files with 4136 additions and 4468 deletions
|
|
@ -32,6 +32,11 @@ packages = ["src"]
|
||||||
[dependency-groups]
|
[dependency-groups]
|
||||||
dev = [
|
dev = [
|
||||||
"debugpy>=1.8.12,<2.0.0",
|
"debugpy>=1.8.12,<2.0.0",
|
||||||
|
"pytest>=8.0.0,<9.0.0",
|
||||||
|
"pytest-asyncio>=0.24.0,<1.0.0",
|
||||||
|
"pytest-mock>=3.14.0,<4.0.0",
|
||||||
|
"pytest-cov>=6.0.0,<7.0.0",
|
||||||
|
"ruff>=0.14.3",
|
||||||
]
|
]
|
||||||
|
|
||||||
[tool.hatch.metadata]
|
[tool.hatch.metadata]
|
||||||
|
|
|
||||||
8
cognee-mcp/pytest.ini
Normal file
8
cognee-mcp/pytest.ini
Normal file
|
|
@ -0,0 +1,8 @@
|
||||||
|
[pytest]
|
||||||
|
asyncio_mode = auto
|
||||||
|
asyncio_default_fixture_loop_scope = function
|
||||||
|
testpaths = tests
|
||||||
|
python_files = test_*.py
|
||||||
|
python_classes = Test*
|
||||||
|
python_functions = test_*
|
||||||
|
|
||||||
|
|
@ -1,745 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
"""
|
|
||||||
Test client for Cognee MCP Server functionality.
|
|
||||||
|
|
||||||
This script tests all the tools and functions available in the Cognee MCP server,
|
|
||||||
including cognify, codify, search, prune, status checks, and utility functions.
|
|
||||||
|
|
||||||
Usage:
|
|
||||||
# Set your OpenAI API key first
|
|
||||||
export OPENAI_API_KEY="your-api-key-here"
|
|
||||||
|
|
||||||
# Run the test client
|
|
||||||
python src/test_client.py
|
|
||||||
|
|
||||||
# Or use LLM_API_KEY instead of OPENAI_API_KEY
|
|
||||||
export LLM_API_KEY="your-api-key-here"
|
|
||||||
python src/test_client.py
|
|
||||||
"""
|
|
||||||
|
|
||||||
import asyncio
|
|
||||||
import os
|
|
||||||
import tempfile
|
|
||||||
import time
|
|
||||||
from contextlib import asynccontextmanager
|
|
||||||
from cognee.shared.logging_utils import setup_logging
|
|
||||||
|
|
||||||
from mcp import ClientSession, StdioServerParameters
|
|
||||||
from mcp.client.stdio import stdio_client
|
|
||||||
|
|
||||||
from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus
|
|
||||||
from cognee.infrastructure.databases.exceptions import DatabaseNotCreatedError
|
|
||||||
from src.server import (
|
|
||||||
node_to_string,
|
|
||||||
retrieved_edges_to_string,
|
|
||||||
load_class,
|
|
||||||
)
|
|
||||||
|
|
||||||
# Set timeout for cognify/codify to complete in
|
|
||||||
TIMEOUT = 5 * 60 # 5 min in seconds
|
|
||||||
|
|
||||||
|
|
||||||
class CogneeTestClient:
|
|
||||||
"""Test client for Cognee MCP Server functionality."""
|
|
||||||
|
|
||||||
def __init__(self):
|
|
||||||
self.test_results = {}
|
|
||||||
self.temp_files = []
|
|
||||||
|
|
||||||
async def setup(self):
|
|
||||||
"""Setup test environment."""
|
|
||||||
print("🔧 Setting up test environment...")
|
|
||||||
|
|
||||||
# Check for required API keys
|
|
||||||
api_key = os.environ.get("OPENAI_API_KEY") or os.environ.get("LLM_API_KEY")
|
|
||||||
if not api_key:
|
|
||||||
print("⚠️ Warning: No OPENAI_API_KEY or LLM_API_KEY found in environment.")
|
|
||||||
print(" Some tests may fail without proper LLM API configuration.")
|
|
||||||
print(" Set OPENAI_API_KEY environment variable for full functionality.")
|
|
||||||
else:
|
|
||||||
print("✅ API key configured.")
|
|
||||||
|
|
||||||
# Create temporary test files
|
|
||||||
self.test_data_dir = tempfile.mkdtemp(prefix="cognee_test_")
|
|
||||||
|
|
||||||
# Create a test text file
|
|
||||||
self.test_text_file = os.path.join(self.test_data_dir, "test.txt")
|
|
||||||
with open(self.test_text_file, "w") as f:
|
|
||||||
f.write(
|
|
||||||
"This is a test document for Cognee testing. It contains information about AI and knowledge graphs."
|
|
||||||
)
|
|
||||||
|
|
||||||
# Create a test code repository structure
|
|
||||||
self.test_repo_dir = os.path.join(self.test_data_dir, "test_repo")
|
|
||||||
os.makedirs(self.test_repo_dir)
|
|
||||||
|
|
||||||
# Create test Python files
|
|
||||||
test_py_file = os.path.join(self.test_repo_dir, "main.py")
|
|
||||||
with open(test_py_file, "w") as f:
|
|
||||||
f.write("""
|
|
||||||
def hello_world():
|
|
||||||
'''A simple hello world function.'''
|
|
||||||
return "Hello, World!"
|
|
||||||
|
|
||||||
class TestClass:
|
|
||||||
'''A test class for demonstration.'''
|
|
||||||
def __init__(self, name):
|
|
||||||
self.name = name
|
|
||||||
|
|
||||||
def greet(self):
|
|
||||||
return f"Hello, {self.name}!"
|
|
||||||
""")
|
|
||||||
|
|
||||||
# Create a test configuration file
|
|
||||||
config_file = os.path.join(self.test_repo_dir, "config.py")
|
|
||||||
with open(config_file, "w") as f:
|
|
||||||
f.write("""
|
|
||||||
# Configuration settings
|
|
||||||
DATABASE_URL = "sqlite:///test.db"
|
|
||||||
DEBUG = True
|
|
||||||
""")
|
|
||||||
|
|
||||||
# Create test developer rules files
|
|
||||||
cursorrules_file = os.path.join(self.test_data_dir, ".cursorrules")
|
|
||||||
with open(cursorrules_file, "w") as f:
|
|
||||||
f.write("# Test cursor rules\nUse Python best practices.")
|
|
||||||
|
|
||||||
self.temp_files.extend([self.test_text_file, test_py_file, config_file, cursorrules_file])
|
|
||||||
print(f"✅ Test environment created at: {self.test_data_dir}")
|
|
||||||
|
|
||||||
async def cleanup(self):
|
|
||||||
"""Clean up test environment."""
|
|
||||||
print("🧹 Cleaning up test environment...")
|
|
||||||
import shutil
|
|
||||||
|
|
||||||
if os.path.exists(self.test_data_dir):
|
|
||||||
shutil.rmtree(self.test_data_dir)
|
|
||||||
print("✅ Cleanup completed")
|
|
||||||
|
|
||||||
@asynccontextmanager
|
|
||||||
async def mcp_server_session(self):
|
|
||||||
"""Context manager to start and manage MCP server session."""
|
|
||||||
# Get the path to the server script
|
|
||||||
server_script = os.path.join(os.path.dirname(__file__), "server.py")
|
|
||||||
|
|
||||||
# Pass current environment variables to the server process
|
|
||||||
# This ensures OpenAI API key and other config is available
|
|
||||||
server_env = os.environ.copy()
|
|
||||||
|
|
||||||
# Start the server process
|
|
||||||
server_params = StdioServerParameters(
|
|
||||||
command="python",
|
|
||||||
args=[server_script, "--transport", "stdio"],
|
|
||||||
env=server_env,
|
|
||||||
)
|
|
||||||
|
|
||||||
async with stdio_client(server_params) as (read, write):
|
|
||||||
async with ClientSession(read, write) as session:
|
|
||||||
# Initialize the session
|
|
||||||
await session.initialize()
|
|
||||||
yield session
|
|
||||||
|
|
||||||
async def test_mcp_server_startup_and_tools(self):
|
|
||||||
"""Test that the MCP server starts properly and returns tool results."""
|
|
||||||
print("\n🧪 Testing MCP server startup and tool execution...")
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with self.mcp_server_session() as session:
|
|
||||||
# Test 1: List available tools
|
|
||||||
print(" 🔍 Testing tool discovery...")
|
|
||||||
tools_result = await session.list_tools()
|
|
||||||
|
|
||||||
expected_tools = {
|
|
||||||
"cognify",
|
|
||||||
"codify",
|
|
||||||
"search",
|
|
||||||
"prune",
|
|
||||||
"cognify_status",
|
|
||||||
"codify_status",
|
|
||||||
"cognee_add_developer_rules",
|
|
||||||
"list_data",
|
|
||||||
"delete",
|
|
||||||
}
|
|
||||||
available_tools = {tool.name for tool in tools_result.tools}
|
|
||||||
|
|
||||||
if not expected_tools.issubset(available_tools):
|
|
||||||
missing_tools = expected_tools - available_tools
|
|
||||||
raise AssertionError(f"Missing expected tools: {missing_tools}")
|
|
||||||
|
|
||||||
print(
|
|
||||||
f" ✅ Found {len(available_tools)} tools: {', '.join(sorted(available_tools))}"
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["mcp_server_integration"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "MCP server integration test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ MCP server integration test failed: {e}")
|
|
||||||
|
|
||||||
async def test_prune(self):
|
|
||||||
"""Test the prune functionality using MCP client."""
|
|
||||||
print("\n🧪 Testing prune functionality...")
|
|
||||||
try:
|
|
||||||
async with self.mcp_server_session() as session:
|
|
||||||
result = await session.call_tool("prune", arguments={})
|
|
||||||
self.test_results["prune"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": result,
|
|
||||||
"message": "Prune executed successfully",
|
|
||||||
}
|
|
||||||
print("✅ Prune test passed")
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["prune"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "Prune test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ Prune test failed: {e}")
|
|
||||||
raise e
|
|
||||||
|
|
||||||
async def test_cognify(self, test_text, test_name):
|
|
||||||
"""Test the cognify functionality using MCP client."""
|
|
||||||
print("\n🧪 Testing cognify functionality...")
|
|
||||||
try:
|
|
||||||
# Test with simple text using MCP client
|
|
||||||
async with self.mcp_server_session() as session:
|
|
||||||
cognify_result = await session.call_tool("cognify", arguments={"data": test_text})
|
|
||||||
|
|
||||||
start = time.time() # mark the start
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Wait a moment
|
|
||||||
await asyncio.sleep(5)
|
|
||||||
|
|
||||||
# Check if cognify processing is finished
|
|
||||||
status_result = await session.call_tool("cognify_status", arguments={})
|
|
||||||
if hasattr(status_result, "content") and status_result.content:
|
|
||||||
status_text = (
|
|
||||||
status_result.content[0].text
|
|
||||||
if status_result.content
|
|
||||||
else str(status_result)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
status_text = str(status_result)
|
|
||||||
|
|
||||||
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text:
|
|
||||||
break
|
|
||||||
elif time.time() - start > TIMEOUT:
|
|
||||||
raise TimeoutError("Cognify did not complete in 5min")
|
|
||||||
except DatabaseNotCreatedError:
|
|
||||||
if time.time() - start > TIMEOUT:
|
|
||||||
raise TimeoutError("Database was not created in 5min")
|
|
||||||
|
|
||||||
self.test_results[test_name] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": cognify_result,
|
|
||||||
"message": f"{test_name} executed successfully",
|
|
||||||
}
|
|
||||||
print(f"✅ {test_name} test passed")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results[test_name] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": f"{test_name} test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ {test_name} test failed: {e}")
|
|
||||||
|
|
||||||
async def test_codify(self):
|
|
||||||
"""Test the codify functionality using MCP client."""
|
|
||||||
print("\n🧪 Testing codify functionality...")
|
|
||||||
try:
|
|
||||||
async with self.mcp_server_session() as session:
|
|
||||||
codify_result = await session.call_tool(
|
|
||||||
"codify", arguments={"repo_path": self.test_repo_dir}
|
|
||||||
)
|
|
||||||
|
|
||||||
start = time.time() # mark the start
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Wait a moment
|
|
||||||
await asyncio.sleep(5)
|
|
||||||
|
|
||||||
# Check if codify processing is finished
|
|
||||||
status_result = await session.call_tool("codify_status", arguments={})
|
|
||||||
if hasattr(status_result, "content") and status_result.content:
|
|
||||||
status_text = (
|
|
||||||
status_result.content[0].text
|
|
||||||
if status_result.content
|
|
||||||
else str(status_result)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
status_text = str(status_result)
|
|
||||||
|
|
||||||
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text:
|
|
||||||
break
|
|
||||||
elif time.time() - start > TIMEOUT:
|
|
||||||
raise TimeoutError("Codify did not complete in 5min")
|
|
||||||
except DatabaseNotCreatedError:
|
|
||||||
if time.time() - start > TIMEOUT:
|
|
||||||
raise TimeoutError("Database was not created in 5min")
|
|
||||||
|
|
||||||
self.test_results["codify"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": codify_result,
|
|
||||||
"message": "Codify executed successfully",
|
|
||||||
}
|
|
||||||
print("✅ Codify test passed")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["codify"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "Codify test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ Codify test failed: {e}")
|
|
||||||
|
|
||||||
async def test_cognee_add_developer_rules(self):
|
|
||||||
"""Test the cognee_add_developer_rules functionality using MCP client."""
|
|
||||||
print("\n🧪 Testing cognee_add_developer_rules functionality...")
|
|
||||||
try:
|
|
||||||
async with self.mcp_server_session() as session:
|
|
||||||
result = await session.call_tool(
|
|
||||||
"cognee_add_developer_rules", arguments={"base_path": self.test_data_dir}
|
|
||||||
)
|
|
||||||
|
|
||||||
start = time.time() # mark the start
|
|
||||||
while True:
|
|
||||||
try:
|
|
||||||
# Wait a moment
|
|
||||||
await asyncio.sleep(5)
|
|
||||||
|
|
||||||
# Check if developer rule cognify processing is finished
|
|
||||||
status_result = await session.call_tool("cognify_status", arguments={})
|
|
||||||
if hasattr(status_result, "content") and status_result.content:
|
|
||||||
status_text = (
|
|
||||||
status_result.content[0].text
|
|
||||||
if status_result.content
|
|
||||||
else str(status_result)
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
status_text = str(status_result)
|
|
||||||
|
|
||||||
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text:
|
|
||||||
break
|
|
||||||
elif time.time() - start > TIMEOUT:
|
|
||||||
raise TimeoutError(
|
|
||||||
"Cognify of developer rules did not complete in 5min"
|
|
||||||
)
|
|
||||||
except DatabaseNotCreatedError:
|
|
||||||
if time.time() - start > TIMEOUT:
|
|
||||||
raise TimeoutError("Database was not created in 5min")
|
|
||||||
|
|
||||||
self.test_results["cognee_add_developer_rules"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": result,
|
|
||||||
"message": "Developer rules addition executed successfully",
|
|
||||||
}
|
|
||||||
print("✅ Developer rules test passed")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["cognee_add_developer_rules"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "Developer rules test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ Developer rules test failed: {e}")
|
|
||||||
|
|
||||||
async def test_search_functionality(self):
|
|
||||||
"""Test the search functionality with different search types using MCP client."""
|
|
||||||
print("\n🧪 Testing search functionality...")
|
|
||||||
|
|
||||||
search_query = "What is artificial intelligence?"
|
|
||||||
|
|
||||||
# Test if all search types will execute
|
|
||||||
from cognee import SearchType
|
|
||||||
|
|
||||||
# Go through all Cognee search types
|
|
||||||
for search_type in SearchType:
|
|
||||||
# Don't test these search types
|
|
||||||
if search_type in [SearchType.NATURAL_LANGUAGE, SearchType.CYPHER]:
|
|
||||||
break
|
|
||||||
try:
|
|
||||||
async with self.mcp_server_session() as session:
|
|
||||||
result = await session.call_tool(
|
|
||||||
"search",
|
|
||||||
arguments={"search_query": search_query, "search_type": search_type.value},
|
|
||||||
)
|
|
||||||
self.test_results[f"search_{search_type}"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": result,
|
|
||||||
"message": f"Search with {search_type} successful",
|
|
||||||
}
|
|
||||||
print(f"✅ Search {search_type} test passed")
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results[f"search_{search_type}"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": f"Search with {search_type} failed",
|
|
||||||
}
|
|
||||||
print(f"❌ Search {search_type} test failed: {e}")
|
|
||||||
|
|
||||||
async def test_list_data(self):
|
|
||||||
"""Test the list_data functionality."""
|
|
||||||
print("\n🧪 Testing list_data functionality...")
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with self.mcp_server_session() as session:
|
|
||||||
# Test listing all datasets
|
|
||||||
result = await session.call_tool("list_data", arguments={})
|
|
||||||
|
|
||||||
if result.content and len(result.content) > 0:
|
|
||||||
content = result.content[0].text
|
|
||||||
|
|
||||||
# Check if the output contains expected elements
|
|
||||||
if "Available Datasets:" in content or "No datasets found" in content:
|
|
||||||
self.test_results["list_data_all"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": content[:200] + "..." if len(content) > 200 else content,
|
|
||||||
"message": "list_data (all datasets) successful",
|
|
||||||
}
|
|
||||||
print("✅ list_data (all datasets) test passed")
|
|
||||||
|
|
||||||
# If there are datasets, try to list data for the first one
|
|
||||||
if "Dataset ID:" in content:
|
|
||||||
# Extract the first dataset ID from the output
|
|
||||||
lines = content.split("\n")
|
|
||||||
dataset_id = None
|
|
||||||
for line in lines:
|
|
||||||
if "Dataset ID:" in line:
|
|
||||||
dataset_id = line.split("Dataset ID:")[1].strip()
|
|
||||||
break
|
|
||||||
|
|
||||||
if dataset_id:
|
|
||||||
# Test listing data for specific dataset
|
|
||||||
specific_result = await session.call_tool(
|
|
||||||
"list_data", arguments={"dataset_id": dataset_id}
|
|
||||||
)
|
|
||||||
|
|
||||||
if specific_result.content and len(specific_result.content) > 0:
|
|
||||||
specific_content = specific_result.content[0].text
|
|
||||||
if "Dataset:" in specific_content:
|
|
||||||
self.test_results["list_data_specific"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": specific_content[:200] + "..."
|
|
||||||
if len(specific_content) > 200
|
|
||||||
else specific_content,
|
|
||||||
"message": "list_data (specific dataset) successful",
|
|
||||||
}
|
|
||||||
print("✅ list_data (specific dataset) test passed")
|
|
||||||
else:
|
|
||||||
raise Exception(
|
|
||||||
"Specific dataset listing returned unexpected format"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise Exception("Specific dataset listing returned no content")
|
|
||||||
else:
|
|
||||||
raise Exception("list_data returned unexpected format")
|
|
||||||
else:
|
|
||||||
raise Exception("list_data returned no content")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["list_data"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "list_data test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ list_data test failed: {e}")
|
|
||||||
|
|
||||||
async def test_delete(self):
|
|
||||||
"""Test the delete functionality."""
|
|
||||||
print("\n🧪 Testing delete functionality...")
|
|
||||||
|
|
||||||
try:
|
|
||||||
async with self.mcp_server_session() as session:
|
|
||||||
# First, let's get available data to delete
|
|
||||||
list_result = await session.call_tool("list_data", arguments={})
|
|
||||||
|
|
||||||
if not (list_result.content and len(list_result.content) > 0):
|
|
||||||
raise Exception("No data available for delete test - list_data returned empty")
|
|
||||||
|
|
||||||
content = list_result.content[0].text
|
|
||||||
|
|
||||||
# Look for data IDs and dataset IDs in the content
|
|
||||||
lines = content.split("\n")
|
|
||||||
dataset_id = None
|
|
||||||
data_id = None
|
|
||||||
|
|
||||||
for line in lines:
|
|
||||||
if "Dataset ID:" in line:
|
|
||||||
dataset_id = line.split("Dataset ID:")[1].strip()
|
|
||||||
elif "Data ID:" in line:
|
|
||||||
data_id = line.split("Data ID:")[1].strip()
|
|
||||||
break # Get the first data item
|
|
||||||
|
|
||||||
if dataset_id and data_id:
|
|
||||||
# Test soft delete (default)
|
|
||||||
delete_result = await session.call_tool(
|
|
||||||
"delete",
|
|
||||||
arguments={"data_id": data_id, "dataset_id": dataset_id, "mode": "soft"},
|
|
||||||
)
|
|
||||||
|
|
||||||
if delete_result.content and len(delete_result.content) > 0:
|
|
||||||
delete_content = delete_result.content[0].text
|
|
||||||
|
|
||||||
if "Delete operation completed successfully" in delete_content:
|
|
||||||
self.test_results["delete_soft"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": delete_content[:200] + "..."
|
|
||||||
if len(delete_content) > 200
|
|
||||||
else delete_content,
|
|
||||||
"message": "delete (soft mode) successful",
|
|
||||||
}
|
|
||||||
print("✅ delete (soft mode) test passed")
|
|
||||||
else:
|
|
||||||
# Check if it's an expected error (like document not found)
|
|
||||||
if "not found" in delete_content.lower():
|
|
||||||
self.test_results["delete_soft"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": delete_content,
|
|
||||||
"message": "delete test passed with expected 'not found' error",
|
|
||||||
}
|
|
||||||
print("✅ delete test passed (expected 'not found' error)")
|
|
||||||
else:
|
|
||||||
raise Exception(
|
|
||||||
f"Delete returned unexpected content: {delete_content}"
|
|
||||||
)
|
|
||||||
else:
|
|
||||||
raise Exception("Delete returned no content")
|
|
||||||
|
|
||||||
else:
|
|
||||||
# Test with invalid UUIDs to check error handling
|
|
||||||
invalid_result = await session.call_tool(
|
|
||||||
"delete",
|
|
||||||
arguments={
|
|
||||||
"data_id": "invalid-uuid",
|
|
||||||
"dataset_id": "another-invalid-uuid",
|
|
||||||
"mode": "soft",
|
|
||||||
},
|
|
||||||
)
|
|
||||||
|
|
||||||
if invalid_result.content and len(invalid_result.content) > 0:
|
|
||||||
invalid_content = invalid_result.content[0].text
|
|
||||||
|
|
||||||
if "Invalid UUID format" in invalid_content:
|
|
||||||
self.test_results["delete_error_handling"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": invalid_content,
|
|
||||||
"message": "delete error handling works correctly",
|
|
||||||
}
|
|
||||||
print("✅ delete error handling test passed")
|
|
||||||
else:
|
|
||||||
raise Exception(f"Expected UUID error not found: {invalid_content}")
|
|
||||||
else:
|
|
||||||
raise Exception("Delete error test returned no content")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["delete"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "delete test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ delete test failed: {e}")
|
|
||||||
|
|
||||||
def test_utility_functions(self):
|
|
||||||
"""Test utility functions."""
|
|
||||||
print("\n🧪 Testing utility functions...")
|
|
||||||
|
|
||||||
# Test node_to_string
|
|
||||||
try:
|
|
||||||
test_node = {"id": "test_id", "name": "test_name", "type": "test_type"}
|
|
||||||
result = node_to_string(test_node)
|
|
||||||
expected = 'Node(id: "test_id", name: "test_name")'
|
|
||||||
|
|
||||||
if result == expected:
|
|
||||||
self.test_results["node_to_string"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": result,
|
|
||||||
"message": "node_to_string function works correctly",
|
|
||||||
}
|
|
||||||
print("✅ node_to_string test passed")
|
|
||||||
else:
|
|
||||||
self.test_results["node_to_string"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"result": result,
|
|
||||||
"expected": expected,
|
|
||||||
"message": "node_to_string function output mismatch",
|
|
||||||
}
|
|
||||||
print(f"❌ node_to_string test failed: expected {expected}, got {result}")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["node_to_string"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "node_to_string test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ node_to_string test failed: {e}")
|
|
||||||
|
|
||||||
# Test retrieved_edges_to_string
|
|
||||||
try:
|
|
||||||
test_triplet = [
|
|
||||||
(
|
|
||||||
{"id": "node1", "name": "Node1"},
|
|
||||||
{"relationship_name": "CONNECTS_TO"},
|
|
||||||
{"id": "node2", "name": "Node2"},
|
|
||||||
)
|
|
||||||
]
|
|
||||||
result = retrieved_edges_to_string(test_triplet)
|
|
||||||
expected = (
|
|
||||||
'Node(id: "node1", name: "Node1") CONNECTS_TO Node(id: "node2", name: "Node2")'
|
|
||||||
)
|
|
||||||
if result == expected:
|
|
||||||
self.test_results["retrieved_edges_to_string"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"result": result,
|
|
||||||
"message": "retrieved_edges_to_string function works correctly",
|
|
||||||
}
|
|
||||||
print("✅ retrieved_edges_to_string test passed")
|
|
||||||
else:
|
|
||||||
self.test_results["retrieved_edges_to_string"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"result": result,
|
|
||||||
"expected": expected,
|
|
||||||
"message": "retrieved_edges_to_string function output mismatch",
|
|
||||||
}
|
|
||||||
print(
|
|
||||||
f"❌ retrieved_edges_to_string test failed: expected {expected}, got {result}"
|
|
||||||
)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["retrieved_edges_to_string"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "retrieved_edges_to_string test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ retrieved_edges_to_string test failed: {e}")
|
|
||||||
|
|
||||||
def test_load_class_function(self):
|
|
||||||
"""Test load_class function."""
|
|
||||||
print("\n🧪 Testing load_class function...")
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Create a temporary Python file with a test class
|
|
||||||
test_module_file = os.path.join(self.test_data_dir, "test_model.py")
|
|
||||||
with open(test_module_file, "w") as f:
|
|
||||||
f.write("""
|
|
||||||
class TestModel:
|
|
||||||
def __init__(self):
|
|
||||||
self.name = "TestModel"
|
|
||||||
|
|
||||||
def get_name(self):
|
|
||||||
return self.name
|
|
||||||
""")
|
|
||||||
|
|
||||||
# Test loading the class
|
|
||||||
loaded_class = load_class(test_module_file, "TestModel")
|
|
||||||
instance = loaded_class()
|
|
||||||
|
|
||||||
if hasattr(instance, "get_name") and instance.get_name() == "TestModel":
|
|
||||||
self.test_results["load_class"] = {
|
|
||||||
"status": "PASS",
|
|
||||||
"message": "load_class function works correctly",
|
|
||||||
}
|
|
||||||
print("✅ load_class test passed")
|
|
||||||
else:
|
|
||||||
self.test_results["load_class"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"message": "load_class function did not load class correctly",
|
|
||||||
}
|
|
||||||
print("❌ load_class test failed: class not loaded correctly")
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
self.test_results["load_class"] = {
|
|
||||||
"status": "FAIL",
|
|
||||||
"error": str(e),
|
|
||||||
"message": "load_class test failed",
|
|
||||||
}
|
|
||||||
print(f"❌ load_class test failed: {e}")
|
|
||||||
|
|
||||||
async def run_all_tests(self):
|
|
||||||
"""Run all tests."""
|
|
||||||
print("🚀 Starting Cognee MCP Server Test Suite")
|
|
||||||
print("=" * 50)
|
|
||||||
|
|
||||||
await self.setup()
|
|
||||||
|
|
||||||
# Test MCP server integration first
|
|
||||||
await self.test_mcp_server_startup_and_tools()
|
|
||||||
|
|
||||||
# Run tests in logical order
|
|
||||||
await self.test_prune() # Start with clean slate
|
|
||||||
|
|
||||||
# Test cognify twice to make sure updating a dataset with new docs is working as expected
|
|
||||||
await self.test_cognify(
|
|
||||||
test_text="Artificial Intelligence is transforming the world through machine learning and deep learning technologies.",
|
|
||||||
test_name="Cognify1",
|
|
||||||
)
|
|
||||||
await self.test_cognify(
|
|
||||||
test_text="Natural language processing (NLP) is an interdisciplinary subfield of computer science and information retrieval.",
|
|
||||||
test_name="Cognify2",
|
|
||||||
)
|
|
||||||
|
|
||||||
await self.test_codify()
|
|
||||||
await self.test_cognee_add_developer_rules()
|
|
||||||
|
|
||||||
# Test list_data and delete functionality
|
|
||||||
await self.test_list_data()
|
|
||||||
await self.test_delete()
|
|
||||||
|
|
||||||
await self.test_search_functionality()
|
|
||||||
|
|
||||||
# Test utility functions (synchronous)
|
|
||||||
self.test_utility_functions()
|
|
||||||
self.test_load_class_function()
|
|
||||||
|
|
||||||
await self.cleanup()
|
|
||||||
|
|
||||||
# Print summary
|
|
||||||
self.print_test_summary()
|
|
||||||
|
|
||||||
def print_test_summary(self):
|
|
||||||
"""Print test results summary."""
|
|
||||||
print("\n" + "=" * 50)
|
|
||||||
print("📊 TEST RESULTS SUMMARY")
|
|
||||||
print("=" * 50)
|
|
||||||
|
|
||||||
passed = 0
|
|
||||||
failed = 0
|
|
||||||
|
|
||||||
for test_name, result in self.test_results.items():
|
|
||||||
if result["status"] == "PASS":
|
|
||||||
status_emoji = "✅"
|
|
||||||
passed += 1
|
|
||||||
else:
|
|
||||||
status_emoji = "❌"
|
|
||||||
failed += 1
|
|
||||||
|
|
||||||
print(f"{status_emoji} {test_name}: {result['status']}")
|
|
||||||
|
|
||||||
if result["status"] == "FAIL" and "error" in result:
|
|
||||||
print(f" Error: {result['error']}")
|
|
||||||
|
|
||||||
print("\n" + "-" * 50)
|
|
||||||
total_tests = passed + failed
|
|
||||||
print(f"Total Tests: {total_tests}")
|
|
||||||
print(f"Passed: {passed}")
|
|
||||||
print(f"Failed: {failed}")
|
|
||||||
print(f"Success Rate: {(passed / total_tests * 100):.1f}%")
|
|
||||||
|
|
||||||
if failed > 0:
|
|
||||||
print(f"\n ⚠️ {failed} test(s) failed - review results above for details")
|
|
||||||
|
|
||||||
|
|
||||||
async def main():
|
|
||||||
"""Main function to run the test suite."""
|
|
||||||
client = CogneeTestClient()
|
|
||||||
await client.run_all_tests()
|
|
||||||
|
|
||||||
|
|
||||||
if __name__ == "__main__":
|
|
||||||
from logging import ERROR
|
|
||||||
|
|
||||||
logger = setup_logging(log_level=ERROR)
|
|
||||||
asyncio.run(main())
|
|
||||||
0
cognee-mcp/tests/__init__.py
Normal file
0
cognee-mcp/tests/__init__.py
Normal file
23
cognee-mcp/tests/conftest.py
Normal file
23
cognee-mcp/tests/conftest.py
Normal file
|
|
@ -0,0 +1,23 @@
|
||||||
|
import pathlib
|
||||||
|
import pytest
|
||||||
|
import cognee
|
||||||
|
from src import server
|
||||||
|
from src.cognee_client import CogneeClient
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(autouse=True)
|
||||||
|
async def setup_isolated_cognee(request):
|
||||||
|
test_name = request.node.name
|
||||||
|
test_base = pathlib.Path(request.fspath).parent
|
||||||
|
|
||||||
|
cognee.config.data_root_directory(str(test_base / f".data_storage/{test_name}"))
|
||||||
|
cognee.config.system_root_directory(str(test_base / f".cognee_system/{test_name}"))
|
||||||
|
|
||||||
|
await cognee.prune.prune_data()
|
||||||
|
await cognee.prune.prune_system(metadata=True)
|
||||||
|
|
||||||
|
server.cognee_client = CogneeClient(api_url=None, api_token=None)
|
||||||
|
|
||||||
|
yield
|
||||||
|
|
||||||
|
server.cognee_client = None
|
||||||
0
cognee-mcp/tests/tools/__init__.py
Normal file
0
cognee-mcp/tests/tools/__init__.py
Normal file
31
cognee-mcp/tests/tools/test_basic_tools.py
Normal file
31
cognee-mcp/tests/tools/test_basic_tools.py
Normal file
|
|
@ -0,0 +1,31 @@
|
||||||
|
"""
|
||||||
|
Tests for basic MCP tools: prune, cognify_status
|
||||||
|
|
||||||
|
These are integration tests that test the actual tool behavior.
|
||||||
|
Run with: pytest tests/tools/test_basic_tools.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import mcp.types as types
|
||||||
|
|
||||||
|
from src import server
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_prune():
|
||||||
|
"""Test prune tool - removes all data from knowledge graph"""
|
||||||
|
result = await server.prune()
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert "Pruned" in result[0].text or "not available" in result[0].text
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cognify_status():
|
||||||
|
"""Test cognify_status tool - gets status of cognify pipeline"""
|
||||||
|
result = await server.cognify_status()
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert len(result[0].text) > 0
|
||||||
21
cognee-mcp/tests/tools/test_cognify.py
Normal file
21
cognee-mcp/tests/tools/test_cognify.py
Normal file
|
|
@ -0,0 +1,21 @@
|
||||||
|
"""
|
||||||
|
Test for cognify tool
|
||||||
|
|
||||||
|
These are integration tests that test the actual tool behavior.
|
||||||
|
Run with: pytest tests/tools/test_cognify.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import mcp.types as types
|
||||||
|
|
||||||
|
from src import server
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_cognify():
|
||||||
|
"""Test cognify tool - launches background task to process data"""
|
||||||
|
result = await server.cognify(data="Test data for cognify")
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert "Background process" in result[0].text or "launched" in result[0].text
|
||||||
38
cognee-mcp/tests/tools/test_list_data.py
Normal file
38
cognee-mcp/tests/tools/test_list_data.py
Normal file
|
|
@ -0,0 +1,38 @@
|
||||||
|
"""
|
||||||
|
Tests for data management tools: list_data, delete
|
||||||
|
|
||||||
|
These are integration tests that test the actual tool behavior.
|
||||||
|
Run with: pytest tests/tools/test_data_tools.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
import mcp.types as types
|
||||||
|
import cognee
|
||||||
|
|
||||||
|
from src import server
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_data_when_empty():
|
||||||
|
"""Test list_data tool - lists all datasets"""
|
||||||
|
result = await server.list_data()
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert (
|
||||||
|
"❌ Failed to list data: DatabaseNotCreatedError: The database has not been created yet"
|
||||||
|
in result[0].text
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_list_data_when_cognified():
|
||||||
|
"""Test list_data tool - lists all datasets"""
|
||||||
|
await cognee.add("Every node tells a story. Most of mine are unit tests.")
|
||||||
|
await cognee.cognify()
|
||||||
|
|
||||||
|
result = await server.list_data()
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert all(s in result[0].text for s in ["📂 Available Datasets", "Dataset ID: "])
|
||||||
92
cognee-mcp/tests/tools/test_search.py
Normal file
92
cognee-mcp/tests/tools/test_search.py
Normal file
|
|
@ -0,0 +1,92 @@
|
||||||
|
"""
|
||||||
|
Test for search tool
|
||||||
|
|
||||||
|
These are integration tests that test the actual tool behavior.
|
||||||
|
Run with: pytest tests/tools/test_search.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import cognee
|
||||||
|
from cognee.infrastructure.databases.exceptions import DatabaseNotCreatedError
|
||||||
|
import pytest
|
||||||
|
import mcp.types as types
|
||||||
|
from cognee import SearchType
|
||||||
|
|
||||||
|
from src import server
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_search_handles_database_not_ready():
|
||||||
|
"""Test search tool handles database not ready scenario gracefully"""
|
||||||
|
with pytest.raises(DatabaseNotCreatedError):
|
||||||
|
await server.search(search_query="test query", search_type="GRAPH_COMPLETION")
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_search_graph_completion():
|
||||||
|
"""Test search with GRAPH_COMPLETION type"""
|
||||||
|
await cognee.add("Artificial intelligence and machine learning are transforming technology.")
|
||||||
|
await cognee.cognify()
|
||||||
|
|
||||||
|
result = await server.search(
|
||||||
|
search_query="What is AI?", search_type=SearchType.GRAPH_COMPLETION.value
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert result[0].text is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_search_rag_completion():
|
||||||
|
"""Test search with RAG_COMPLETION type"""
|
||||||
|
await cognee.add("Python is a programming language that emphasizes readability.")
|
||||||
|
await cognee.cognify()
|
||||||
|
|
||||||
|
result = await server.search(
|
||||||
|
search_query="What is Python?", search_type=SearchType.RAG_COMPLETION.value
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert result[0].text is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_search_chunks():
|
||||||
|
"""Test search with CHUNKS type"""
|
||||||
|
await cognee.add("JavaScript is the language of the web browser.")
|
||||||
|
await cognee.cognify()
|
||||||
|
|
||||||
|
result = await server.search(search_query="web browser", search_type=SearchType.CHUNKS.value)
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert result[0].text is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_search_summaries():
|
||||||
|
"""Test search with SUMMARIES type"""
|
||||||
|
await cognee.add("Database systems manage and store structured data efficiently.")
|
||||||
|
await cognee.cognify()
|
||||||
|
|
||||||
|
result = await server.search(search_query="database", search_type=SearchType.SUMMARIES.value)
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert result[0].text is not None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_search_feeling_lucky():
|
||||||
|
"""Test search with FEELING_LUCKY type"""
|
||||||
|
await cognee.add("Machine learning models learn patterns from data.")
|
||||||
|
await cognee.cognify()
|
||||||
|
|
||||||
|
result = await server.search(
|
||||||
|
search_query="learning", search_type=SearchType.FEELING_LUCKY.value
|
||||||
|
)
|
||||||
|
|
||||||
|
assert len(result) == 1
|
||||||
|
assert isinstance(result[0], types.TextContent)
|
||||||
|
assert result[0].text is not None
|
||||||
7641
cognee-mcp/uv.lock
generated
7641
cognee-mcp/uv.lock
generated
File diff suppressed because it is too large
Load diff
Loading…
Add table
Reference in a new issue