Feat/mcp add support for non standalone mode (#1523)

<!-- .github/pull_request_template.md -->

## Description
<!--
Please provide a clear, human-generated description of the changes in
this PR.
DO NOT use AI-generated descriptions. We want to understand your thought
process and reasoning.
-->

With version 0.3.5 onwards, we start Cognee MCP alongside cognee ui in
`cognee-cli -ui`

Currently, cognee-mcp operates as a standalone cognee instance - with
it's own knowledge graph.

This PR
1. adds support for cognee-mcp to run in non-standalone mode as an
interface to an already running cognee backend.
2. updates `cognee-cli -ui` MCP startup

## Type of Change
<!-- Please check the relevant option -->
- [ ] Bug fix (non-breaking change that fixes an issue)
- [x] New feature (non-breaking change that adds functionality)
- [ ] Breaking change (fix or feature that would cause existing
functionality to change)
- [ ] Documentation update
- [ ] Code refactoring
- [ ] Performance improvement
- [ ] Other (please specify):

## Screenshots/Videos (if applicable)
<!-- Add screenshots or videos to help explain your changes -->

## Pre-submission Checklist
<!-- Please check all boxes that apply before submitting your PR -->
- [x] **I have tested my changes thoroughly before submitting this PR**
- [x] **This PR contains minimal changes necessary to address the
issue/feature**
- [ ] My code follows the project's coding standards and style
guidelines
- [ ] I have added tests that prove my fix is effective or that my
feature works
- [ ] I have added necessary documentation (if applicable)
- [ ] All new and existing tests pass
- [ ] I have searched existing PRs to ensure this change hasn't been
submitted already
- [ ] I have linked any relevant issues in the description
- [ ] My commits have clear and descriptive messages

## DCO Affirmation
I affirm that all code in every commit of this pull request conforms to
the terms of the Topoteretes Developer Certificate of Origin.
This commit is contained in:
Vasilije 2025-10-12 14:10:55 +02:00 committed by GitHub
commit c6d12e89c7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 847 additions and 147 deletions

View file

@ -65,6 +65,9 @@ ENV PYTHONUNBUFFERED=1
ENV MCP_LOG_LEVEL=DEBUG ENV MCP_LOG_LEVEL=DEBUG
ENV PYTHONPATH=/app ENV PYTHONPATH=/app
# Add labels for API mode usage
LABEL org.opencontainers.image.description="Cognee MCP Server with API mode support"
# Use the application name from pyproject.toml for normal operation # Use the application name from pyproject.toml for normal operation
# For testing, we'll override this with a direct command # For testing, we'll override this with a direct command
ENTRYPOINT ["/app/entrypoint.sh"] ENTRYPOINT ["/app/entrypoint.sh"]

View file

@ -38,7 +38,8 @@ Build memory for Agents and query from any client that speaks MCP  in your t
## ✨ Features ## ✨ Features
- Multiple transports choose Streamable HTTP --transport http (recommended for web deployments), SSE --transport sse (realtime streaming), or stdio (classic pipe, default) - Multiple transports choose Streamable HTTP --transport http (recommended for web deployments), SSE --transport sse (realtime streaming), or stdio (classic pipe, default)
- Integrated logging all actions written to a rotating file (see get_log_file_location()) and mirrored to console in dev - **API Mode** connect to an already running Cognee FastAPI server instead of using cognee directly (see [API Mode](#-api-mode) below)
- Integrated logging all actions written to a rotating file (see get_log_file_location()) and mirrored to console in dev
- Local file ingestion feed .md, source files, Cursor rulesets, etc. straight from disk - Local file ingestion feed .md, source files, Cursor rulesets, etc. straight from disk
- Background pipelines longrunning cognify & codify jobs spawn offthread; check progress with status tools - Background pipelines longrunning cognify & codify jobs spawn offthread; check progress with status tools
- Developer rules bootstrap one call indexes .cursorrules, .cursor/rules, AGENT.md, and friends into the developer_rules nodeset - Developer rules bootstrap one call indexes .cursorrules, .cursor/rules, AGENT.md, and friends into the developer_rules nodeset
@ -91,7 +92,7 @@ To use different LLM providers / database configurations, and for more info chec
## 🐳 Docker Usage ## 🐳 Docker Usage
If youd rather run cognee-mcp in a container, you have two options: If you'd rather run cognee-mcp in a container, you have two options:
1. **Build locally** 1. **Build locally**
1. Make sure you are in /cognee root directory and have a fresh `.env` containing only your `LLM_API_KEY` (and your chosen settings). 1. Make sure you are in /cognee root directory and have a fresh `.env` containing only your `LLM_API_KEY` (and your chosen settings).
@ -128,6 +129,64 @@ If youd rather run cognee-mcp in a container, you have two options:
- ✅ Direct: `python src/server.py --transport http` - ✅ Direct: `python src/server.py --transport http`
- ❌ Direct: `-e TRANSPORT_MODE=http` (won't work) - ❌ Direct: `-e TRANSPORT_MODE=http` (won't work)
### **Docker API Mode**
To connect the MCP Docker container to a Cognee API server running on your host machine:
#### **Simple Usage (Automatic localhost handling):**
```bash
# Start your Cognee API server on the host
python -m cognee.api.client
# Run MCP container in API mode - localhost is automatically converted!
docker run \
-e TRANSPORT_MODE=sse \
-e API_URL=http://localhost:8000 \
-e API_TOKEN=your_auth_token \
-p 8001:8000 \
--rm -it cognee/cognee-mcp:main
```
**Note:** The container will automatically convert `localhost` to `host.docker.internal` on Mac/Windows/Docker Desktop. You'll see a message in the logs showing the conversion.
#### **Explicit host.docker.internal (Mac/Windows):**
```bash
# Or explicitly use host.docker.internal
docker run \
-e TRANSPORT_MODE=sse \
-e API_URL=http://host.docker.internal:8000 \
-e API_TOKEN=your_auth_token \
-p 8001:8000 \
--rm -it cognee/cognee-mcp:main
```
#### **On Linux (use host network or container IP):**
```bash
# Option 1: Use host network (simplest)
docker run \
--network host \
-e TRANSPORT_MODE=sse \
-e API_URL=http://localhost:8000 \
-e API_TOKEN=your_auth_token \
--rm -it cognee/cognee-mcp:main
# Option 2: Use host IP address
# First, get your host IP: ip addr show docker0
docker run \
-e TRANSPORT_MODE=sse \
-e API_URL=http://172.17.0.1:8000 \
-e API_TOKEN=your_auth_token \
-p 8001:8000 \
--rm -it cognee/cognee-mcp:main
```
**Environment variables for API mode:**
- `API_URL`: URL of the running Cognee API server
- `API_TOKEN`: Authentication token (optional, required if API has authentication enabled)
**Note:** When running in API mode:
- Database migrations are automatically skipped (API server handles its own DB)
- Some features are limited (see [API Mode Limitations](#-api-mode))
## 🔗 MCP Client Configuration ## 🔗 MCP Client Configuration
@ -255,6 +314,76 @@ You can configure both transports simultaneously for testing:
**Note:** Only enable the server you're actually running to avoid connection errors. **Note:** Only enable the server you're actually running to avoid connection errors.
## 🌐 API Mode
The MCP server can operate in two modes:
### **Direct Mode** (Default)
The MCP server directly imports and uses the cognee library. This is the default mode with full feature support.
### **API Mode**
The MCP server connects to an already running Cognee FastAPI server via HTTP requests. This is useful when:
- You have a centralized Cognee API server running
- You want to separate the MCP server from the knowledge graph backend
- You need multiple MCP servers to share the same knowledge graph
**Starting the MCP server in API mode:**
```bash
# Start your Cognee FastAPI server first (default port 8000)
cd /path/to/cognee
python -m cognee.api.client
# Then start the MCP server in API mode
cd cognee-mcp
python src/server.py --api-url http://localhost:8000 --api-token YOUR_AUTH_TOKEN
```
**API Mode with different transports:**
```bash
# With SSE transport
python src/server.py --transport sse --api-url http://localhost:8000 --api-token YOUR_TOKEN
# With HTTP transport
python src/server.py --transport http --api-url http://localhost:8000 --api-token YOUR_TOKEN
```
**API Mode with Docker:**
```bash
# On Mac/Windows (use host.docker.internal to access host)
docker run \
-e TRANSPORT_MODE=sse \
-e API_URL=http://host.docker.internal:8000 \
-e API_TOKEN=YOUR_TOKEN \
-p 8001:8000 \
--rm -it cognee/cognee-mcp:main
# On Linux (use host network)
docker run \
--network host \
-e TRANSPORT_MODE=sse \
-e API_URL=http://localhost:8000 \
-e API_TOKEN=YOUR_TOKEN \
--rm -it cognee/cognee-mcp:main
```
**Command-line arguments for API mode:**
- `--api-url`: Base URL of the running Cognee FastAPI server (e.g., `http://localhost:8000`)
- `--api-token`: Authentication token for the API (optional, required if API has authentication enabled)
**Docker environment variables for API mode:**
- `API_URL`: Base URL of the running Cognee FastAPI server
- `API_TOKEN`: Authentication token (optional, required if API has authentication enabled)
**API Mode limitations:**
Some features are only available in direct mode:
- `codify` (code graph pipeline)
- `cognify_status` / `codify_status` (pipeline status tracking)
- `prune` (data reset)
- `get_developer_rules` (developer rules retrieval)
- `list_data` with specific dataset_id (detailed data listing)
Basic operations like `cognify`, `search`, `delete`, and `list_data` (all datasets) work in both modes.
## 💻 Basic Usage ## 💻 Basic Usage
The MCP server exposes its functionality through tools. Call them from any MCP client (Cursor, Claude Desktop, Cline, Roo and more). The MCP server exposes its functionality through tools. Call them from any MCP client (Cursor, Claude Desktop, Cline, Roo and more).

View file

@ -14,61 +14,94 @@ HTTP_PORT=${HTTP_PORT:-8000}
echo "Debug port: $DEBUG_PORT" echo "Debug port: $DEBUG_PORT"
echo "HTTP port: $HTTP_PORT" echo "HTTP port: $HTTP_PORT"
# Run Alembic migrations with proper error handling. # Check if API mode is enabled
# Note on UserAlreadyExists error handling: if [ -n "$API_URL" ]; then
# During database migrations, we attempt to create a default user. If this user echo "API mode enabled: $API_URL"
# already exists (e.g., from a previous deployment or migration), it's not a echo "Skipping database migrations (API server handles its own database)"
# critical error and shouldn't prevent the application from starting. This is else
# different from other migration errors which could indicate database schema echo "Direct mode: Using local cognee instance"
# inconsistencies and should cause the startup to fail. This check allows for # Run Alembic migrations with proper error handling.
# smooth redeployments and container restarts while maintaining data integrity. # Note on UserAlreadyExists error handling:
echo "Running database migrations..." # During database migrations, we attempt to create a default user. If this user
# already exists (e.g., from a previous deployment or migration), it's not a
# critical error and shouldn't prevent the application from starting. This is
# different from other migration errors which could indicate database schema
# inconsistencies and should cause the startup to fail. This check allows for
# smooth redeployments and container restarts while maintaining data integrity.
echo "Running database migrations..."
MIGRATION_OUTPUT=$(alembic upgrade head) MIGRATION_OUTPUT=$(alembic upgrade head)
MIGRATION_EXIT_CODE=$? MIGRATION_EXIT_CODE=$?
if [[ $MIGRATION_EXIT_CODE -ne 0 ]]; then if [[ $MIGRATION_EXIT_CODE -ne 0 ]]; then
if [[ "$MIGRATION_OUTPUT" == *"UserAlreadyExists"* ]] || [[ "$MIGRATION_OUTPUT" == *"User default_user@example.com already exists"* ]]; then if [[ "$MIGRATION_OUTPUT" == *"UserAlreadyExists"* ]] || [[ "$MIGRATION_OUTPUT" == *"User default_user@example.com already exists"* ]]; then
echo "Warning: Default user already exists, continuing startup..." echo "Warning: Default user already exists, continuing startup..."
else else
echo "Migration failed with unexpected error." echo "Migration failed with unexpected error."
exit 1 exit 1
fi
fi fi
fi
echo "Database migrations done." echo "Database migrations done."
fi
echo "Starting Cognee MCP Server with transport mode: $TRANSPORT_MODE" echo "Starting Cognee MCP Server with transport mode: $TRANSPORT_MODE"
# Add startup delay to ensure DB is ready # Add startup delay to ensure DB is ready
sleep 2 sleep 2
# Build API arguments if API_URL is set
API_ARGS=""
if [ -n "$API_URL" ]; then
# Handle localhost in API_URL - convert to host-accessible address
if echo "$API_URL" | grep -q "localhost" || echo "$API_URL" | grep -q "127.0.0.1"; then
echo "⚠️ Warning: API_URL contains localhost/127.0.0.1"
echo " Original: $API_URL"
# Try to use host.docker.internal (works on Mac/Windows and recent Linux with Docker Desktop)
FIXED_API_URL=$(echo "$API_URL" | sed 's/localhost/host.docker.internal/g' | sed 's/127\.0\.0\.1/host.docker.internal/g')
echo " Converted to: $FIXED_API_URL"
echo " This will work on Mac/Windows/Docker Desktop."
echo " On Linux without Docker Desktop, you may need to:"
echo " - Use --network host, OR"
echo " - Set API_URL=http://172.17.0.1:8000 (Docker bridge IP)"
API_URL="$FIXED_API_URL"
fi
API_ARGS="--api-url $API_URL"
if [ -n "$API_TOKEN" ]; then
API_ARGS="$API_ARGS --api-token $API_TOKEN"
fi
fi
# Modified startup with transport mode selection and error handling # Modified startup with transport mode selection and error handling
if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then
if [ "$DEBUG" = "true" ]; then if [ "$DEBUG" = "true" ]; then
echo "Waiting for the debugger to attach..." echo "Waiting for the debugger to attach..."
if [ "$TRANSPORT_MODE" = "sse" ]; then if [ "$TRANSPORT_MODE" = "sse" ]; then
exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS
elif [ "$TRANSPORT_MODE" = "http" ]; then elif [ "$TRANSPORT_MODE" = "http" ]; then
exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS
else else
exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport stdio --no-migration exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee-mcp --transport stdio --no-migration $API_ARGS
fi fi
else else
if [ "$TRANSPORT_MODE" = "sse" ]; then if [ "$TRANSPORT_MODE" = "sse" ]; then
exec cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration exec cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS
elif [ "$TRANSPORT_MODE" = "http" ]; then elif [ "$TRANSPORT_MODE" = "http" ]; then
exec cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration exec cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS
else else
exec cognee-mcp --transport stdio --no-migration exec cognee-mcp --transport stdio --no-migration $API_ARGS
fi fi
fi fi
else else
if [ "$TRANSPORT_MODE" = "sse" ]; then if [ "$TRANSPORT_MODE" = "sse" ]; then
exec cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration exec cognee-mcp --transport sse --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS
elif [ "$TRANSPORT_MODE" = "http" ]; then elif [ "$TRANSPORT_MODE" = "http" ]; then
exec cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration exec cognee-mcp --transport http --host 0.0.0.0 --port $HTTP_PORT --no-migration $API_ARGS
else else
exec cognee-mcp --transport stdio --no-migration exec cognee-mcp --transport stdio --no-migration $API_ARGS
fi fi
fi fi

View file

@ -13,6 +13,7 @@ dependencies = [
"fastmcp>=2.10.0,<3.0.0", "fastmcp>=2.10.0,<3.0.0",
"mcp>=1.12.0,<2.0.0", "mcp>=1.12.0,<2.0.0",
"uv>=0.6.3,<1.0.0", "uv>=0.6.3,<1.0.0",
"httpx>=0.27.0,<1.0.0",
] ]
authors = [ authors = [

View file

@ -1,4 +1,7 @@
from .server import main as server_main try:
from .server import main as server_main
except ImportError:
from server import main as server_main
import warnings import warnings
import sys import sys

View file

@ -0,0 +1,338 @@
"""
Cognee Client abstraction that supports both direct function calls and HTTP API calls.
This module provides a unified interface for interacting with Cognee, supporting:
- Direct mode: Directly imports and calls cognee functions (default behavior)
- API mode: Makes HTTP requests to a running Cognee FastAPI server
"""
import sys
from typing import Optional, Any, List, Dict
from uuid import UUID
from contextlib import redirect_stdout
import httpx
from cognee.shared.logging_utils import get_logger
import json
logger = get_logger()
class CogneeClient:
"""
Unified client for interacting with Cognee via direct calls or HTTP API.
Parameters
----------
api_url : str, optional
Base URL of the Cognee API server (e.g., "http://localhost:8000").
If None, uses direct cognee function calls.
api_token : str, optional
Authentication token for the API (optional, required if API has authentication enabled).
"""
def __init__(self, api_url: Optional[str] = None, api_token: Optional[str] = None):
self.api_url = api_url.rstrip("/") if api_url else None
self.api_token = api_token
self.use_api = bool(api_url)
if self.use_api:
logger.info(f"Cognee client initialized in API mode: {self.api_url}")
self.client = httpx.AsyncClient(timeout=300.0) # 5 minute timeout for long operations
else:
logger.info("Cognee client initialized in direct mode")
# Import cognee only if we're using direct mode
import cognee as _cognee
self.cognee = _cognee
def _get_headers(self) -> Dict[str, str]:
"""Get headers for API requests."""
headers = {"Content-Type": "application/json"}
if self.api_token:
headers["Authorization"] = f"Bearer {self.api_token}"
return headers
async def add(
self, data: Any, dataset_name: str = "main_dataset", node_set: Optional[List[str]] = None
) -> Dict[str, Any]:
"""
Add data to Cognee for processing.
Parameters
----------
data : Any
Data to add (text, file path, etc.)
dataset_name : str
Name of the dataset to add data to
node_set : List[str], optional
List of node identifiers for graph organization
Returns
-------
Dict[str, Any]
Result of the add operation
"""
if self.use_api:
endpoint = f"{self.api_url}/api/v1/add"
files = {"data": ("data.txt", str(data), "text/plain")}
form_data = {
"datasetName": dataset_name,
}
if node_set is not None:
form_data["node_set"] = json.dumps(node_set)
response = await self.client.post(
endpoint,
files=files,
data=form_data,
headers={"Authorization": f"Bearer {self.api_token}"} if self.api_token else {},
)
response.raise_for_status()
return response.json()
else:
with redirect_stdout(sys.stderr):
await self.cognee.add(data, dataset_name=dataset_name, node_set=node_set)
return {"status": "success", "message": "Data added successfully"}
async def cognify(
self,
datasets: Optional[List[str]] = None,
custom_prompt: Optional[str] = None,
graph_model: Any = None,
) -> Dict[str, Any]:
"""
Transform data into a knowledge graph.
Parameters
----------
datasets : List[str], optional
List of dataset names to process
custom_prompt : str, optional
Custom prompt for entity extraction
graph_model : Any, optional
Custom graph model (only used in direct mode)
Returns
-------
Dict[str, Any]
Result of the cognify operation
"""
if self.use_api:
# API mode: Make HTTP request
endpoint = f"{self.api_url}/api/v1/cognify"
payload = {
"datasets": datasets or ["main_dataset"],
"run_in_background": False,
}
if custom_prompt:
payload["custom_prompt"] = custom_prompt
response = await self.client.post(endpoint, json=payload, headers=self._get_headers())
response.raise_for_status()
return response.json()
else:
# Direct mode: Call cognee directly
with redirect_stdout(sys.stderr):
kwargs = {}
if datasets:
kwargs["datasets"] = datasets
if custom_prompt:
kwargs["custom_prompt"] = custom_prompt
if graph_model:
kwargs["graph_model"] = graph_model
await self.cognee.cognify(**kwargs)
return {"status": "success", "message": "Cognify completed successfully"}
async def search(
self,
query_text: str,
query_type: str,
datasets: Optional[List[str]] = None,
system_prompt: Optional[str] = None,
top_k: int = 10,
) -> Any:
"""
Search the knowledge graph.
Parameters
----------
query_text : str
The search query
query_type : str
Type of search (e.g., "GRAPH_COMPLETION", "INSIGHTS", etc.)
datasets : List[str], optional
List of datasets to search
system_prompt : str, optional
System prompt for completion searches
top_k : int
Maximum number of results
Returns
-------
Any
Search results
"""
if self.use_api:
# API mode: Make HTTP request
endpoint = f"{self.api_url}/api/v1/search"
payload = {"query": query_text, "search_type": query_type.upper(), "top_k": top_k}
if datasets:
payload["datasets"] = datasets
if system_prompt:
payload["system_prompt"] = system_prompt
response = await self.client.post(endpoint, json=payload, headers=self._get_headers())
response.raise_for_status()
return response.json()
else:
# Direct mode: Call cognee directly
from cognee.modules.search.types import SearchType
with redirect_stdout(sys.stderr):
results = await self.cognee.search(
query_type=SearchType[query_type.upper()], query_text=query_text
)
return results
async def delete(self, data_id: UUID, dataset_id: UUID, mode: str = "soft") -> Dict[str, Any]:
"""
Delete data from a dataset.
Parameters
----------
data_id : UUID
ID of the data to delete
dataset_id : UUID
ID of the dataset containing the data
mode : str
Deletion mode ("soft" or "hard")
Returns
-------
Dict[str, Any]
Result of the deletion
"""
if self.use_api:
# API mode: Make HTTP request
endpoint = f"{self.api_url}/api/v1/delete"
params = {"data_id": str(data_id), "dataset_id": str(dataset_id), "mode": mode}
response = await self.client.delete(
endpoint, params=params, headers=self._get_headers()
)
response.raise_for_status()
return response.json()
else:
# Direct mode: Call cognee directly
from cognee.modules.users.methods import get_default_user
with redirect_stdout(sys.stderr):
user = await get_default_user()
result = await self.cognee.delete(
data_id=data_id, dataset_id=dataset_id, mode=mode, user=user
)
return result
async def prune_data(self) -> Dict[str, Any]:
"""
Prune all data from the knowledge graph.
Returns
-------
Dict[str, Any]
Result of the prune operation
"""
if self.use_api:
# Note: The API doesn't expose a prune endpoint, so we'll need to handle this
# For now, raise an error
raise NotImplementedError("Prune operation is not available via API")
else:
# Direct mode: Call cognee directly
with redirect_stdout(sys.stderr):
await self.cognee.prune.prune_data()
return {"status": "success", "message": "Data pruned successfully"}
async def prune_system(self, metadata: bool = True) -> Dict[str, Any]:
"""
Prune system data from the knowledge graph.
Parameters
----------
metadata : bool
Whether to prune metadata
Returns
-------
Dict[str, Any]
Result of the prune operation
"""
if self.use_api:
# Note: The API doesn't expose a prune endpoint
raise NotImplementedError("Prune system operation is not available via API")
else:
# Direct mode: Call cognee directly
with redirect_stdout(sys.stderr):
await self.cognee.prune.prune_system(metadata=metadata)
return {"status": "success", "message": "System pruned successfully"}
async def get_pipeline_status(self, dataset_ids: List[UUID], pipeline_name: str) -> str:
"""
Get the status of a pipeline run.
Parameters
----------
dataset_ids : List[UUID]
List of dataset IDs
pipeline_name : str
Name of the pipeline
Returns
-------
str
Status information
"""
if self.use_api:
# Note: This would need a custom endpoint on the API side
raise NotImplementedError("Pipeline status is not available via API")
else:
# Direct mode: Call cognee directly
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
with redirect_stdout(sys.stderr):
status = await get_pipeline_status(dataset_ids, pipeline_name)
return str(status)
async def list_datasets(self) -> List[Dict[str, Any]]:
"""
List all datasets.
Returns
-------
List[Dict[str, Any]]
List of datasets
"""
if self.use_api:
# API mode: Make HTTP request
endpoint = f"{self.api_url}/api/v1/datasets"
response = await self.client.get(endpoint, headers=self._get_headers())
response.raise_for_status()
return response.json()
else:
# Direct mode: Call cognee directly
from cognee.modules.users.methods import get_default_user
from cognee.modules.data.methods import get_datasets
with redirect_stdout(sys.stderr):
user = await get_default_user()
datasets = await get_datasets(user.id)
return [
{"id": str(d.id), "name": d.name, "created_at": str(d.created_at)}
for d in datasets
]
async def close(self):
"""Close the HTTP client if in API mode."""
if self.use_api and hasattr(self, "client"):
await self.client.aclose()

View file

@ -2,28 +2,27 @@ import json
import os import os
import sys import sys
import argparse import argparse
import cognee
import asyncio import asyncio
import subprocess import subprocess
from pathlib import Path from pathlib import Path
from typing import Optional
from cognee.shared.logging_utils import get_logger, setup_logging, get_log_file_location from cognee.shared.logging_utils import get_logger, setup_logging, get_log_file_location
import importlib.util import importlib.util
from contextlib import redirect_stdout from contextlib import redirect_stdout
import mcp.types as types import mcp.types as types
from mcp.server import FastMCP from mcp.server import FastMCP
from cognee.modules.pipelines.operations.get_pipeline_status import get_pipeline_status
from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id
from cognee.modules.users.methods import get_default_user
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
from cognee.modules.search.types import SearchType
from cognee.shared.data_models import KnowledgeGraph
from cognee.modules.storage.utils import JSONEncoder from cognee.modules.storage.utils import JSONEncoder
from starlette.responses import JSONResponse from starlette.responses import JSONResponse
from starlette.middleware import Middleware from starlette.middleware import Middleware
from starlette.middleware.cors import CORSMiddleware from starlette.middleware.cors import CORSMiddleware
import uvicorn import uvicorn
try:
from .cognee_client import CogneeClient
except ImportError:
from cognee_client import CogneeClient
try: try:
from cognee.tasks.codingagents.coding_rule_associations import ( from cognee.tasks.codingagents.coding_rule_associations import (
@ -41,6 +40,8 @@ mcp = FastMCP("Cognee")
logger = get_logger() logger = get_logger()
cognee_client: Optional[CogneeClient] = None
async def run_sse_with_cors(): async def run_sse_with_cors():
"""Custom SSE transport with CORS middleware.""" """Custom SSE transport with CORS middleware."""
@ -141,11 +142,20 @@ async def cognee_add_developer_rules(
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
logger.info(f"Starting cognify for: {file_path}") logger.info(f"Starting cognify for: {file_path}")
try: try:
await cognee.add(file_path, node_set=["developer_rules"]) await cognee_client.add(file_path, node_set=["developer_rules"])
model = KnowledgeGraph
model = None
if graph_model_file and graph_model_name: if graph_model_file and graph_model_name:
model = load_class(graph_model_file, graph_model_name) if cognee_client.use_api:
await cognee.cognify(graph_model=model) logger.warning(
"Custom graph models are not supported in API mode, ignoring."
)
else:
from cognee.shared.data_models import KnowledgeGraph
model = load_class(graph_model_file, graph_model_name)
await cognee_client.cognify(graph_model=model)
logger.info(f"Cognify finished for: {file_path}") logger.info(f"Cognify finished for: {file_path}")
except Exception as e: except Exception as e:
logger.error(f"Cognify failed for {file_path}: {str(e)}") logger.error(f"Cognify failed for {file_path}: {str(e)}")
@ -293,15 +303,20 @@ async def cognify(
# going to stdout ( like the print function ) to stderr. # going to stdout ( like the print function ) to stderr.
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
logger.info("Cognify process starting.") logger.info("Cognify process starting.")
if graph_model_file and graph_model_name:
graph_model = load_class(graph_model_file, graph_model_name)
else:
graph_model = KnowledgeGraph
await cognee.add(data) graph_model = None
if graph_model_file and graph_model_name:
if cognee_client.use_api:
logger.warning("Custom graph models are not supported in API mode, ignoring.")
else:
from cognee.shared.data_models import KnowledgeGraph
graph_model = load_class(graph_model_file, graph_model_name)
await cognee_client.add(data)
try: try:
await cognee.cognify(graph_model=graph_model, custom_prompt=custom_prompt) await cognee_client.cognify(custom_prompt=custom_prompt, graph_model=graph_model)
logger.info("Cognify process finished.") logger.info("Cognify process finished.")
except Exception as e: except Exception as e:
logger.error("Cognify process failed.") logger.error("Cognify process failed.")
@ -354,16 +369,19 @@ async def save_interaction(data: str) -> list:
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
logger.info("Save interaction process starting.") logger.info("Save interaction process starting.")
await cognee.add(data, node_set=["user_agent_interaction"]) await cognee_client.add(data, node_set=["user_agent_interaction"])
try: try:
await cognee.cognify() await cognee_client.cognify()
logger.info("Save interaction process finished.") logger.info("Save interaction process finished.")
logger.info("Generating associated rules from interaction data.")
await add_rule_associations(data=data, rules_nodeset_name="coding_agent_rules") # Rule associations only work in direct mode
if not cognee_client.use_api:
logger.info("Associated rules generated from interaction data.") logger.info("Generating associated rules from interaction data.")
await add_rule_associations(data=data, rules_nodeset_name="coding_agent_rules")
logger.info("Associated rules generated from interaction data.")
else:
logger.warning("Rule associations are not available in API mode, skipping.")
except Exception as e: except Exception as e:
logger.error("Save interaction process failed.") logger.error("Save interaction process failed.")
@ -420,11 +438,18 @@ async def codify(repo_path: str) -> list:
- All stdout is redirected to stderr to maintain MCP communication integrity - All stdout is redirected to stderr to maintain MCP communication integrity
""" """
if cognee_client.use_api:
error_msg = "❌ Codify operation is not available in API mode. Please use direct mode for code graph pipeline."
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
async def codify_task(repo_path: str): async def codify_task(repo_path: str):
# NOTE: MCP uses stdout to communicate, we must redirect all output # NOTE: MCP uses stdout to communicate, we must redirect all output
# going to stdout ( like the print function ) to stderr. # going to stdout ( like the print function ) to stderr.
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
logger.info("Codify process starting.") logger.info("Codify process starting.")
from cognee.api.v1.cognify.code_graph_pipeline import run_code_graph_pipeline
results = [] results = []
async for result in run_code_graph_pipeline(repo_path, False): async for result in run_code_graph_pipeline(repo_path, False):
results.append(result) results.append(result)
@ -566,20 +591,40 @@ async def search(search_query: str, search_type: str) -> list:
# NOTE: MCP uses stdout to communicate, we must redirect all output # NOTE: MCP uses stdout to communicate, we must redirect all output
# going to stdout ( like the print function ) to stderr. # going to stdout ( like the print function ) to stderr.
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
search_results = await cognee.search( search_results = await cognee_client.search(
query_type=SearchType[search_type.upper()], query_text=search_query query_text=search_query, query_type=search_type
) )
if search_type.upper() == "CODE": # Handle different result formats based on API vs direct mode
return json.dumps(search_results, cls=JSONEncoder) if cognee_client.use_api:
elif ( # API mode returns JSON-serialized results
search_type.upper() == "GRAPH_COMPLETION" or search_type.upper() == "RAG_COMPLETION" if isinstance(search_results, str):
): return search_results
return str(search_results[0]) elif isinstance(search_results, list):
elif search_type.upper() == "CHUNKS": if (
return str(search_results) search_type.upper() in ["GRAPH_COMPLETION", "RAG_COMPLETION"]
and len(search_results) > 0
):
return str(search_results[0])
return str(search_results)
else:
return json.dumps(search_results, cls=JSONEncoder)
else: else:
return str(search_results) # Direct mode processing
if search_type.upper() == "CODE":
return json.dumps(search_results, cls=JSONEncoder)
elif (
search_type.upper() == "GRAPH_COMPLETION"
or search_type.upper() == "RAG_COMPLETION"
):
return str(search_results[0])
elif search_type.upper() == "CHUNKS":
return str(search_results)
elif search_type.upper() == "INSIGHTS":
results = retrieved_edges_to_string(search_results)
return results
else:
return str(search_results)
search_results = await search_task(search_query, search_type) search_results = await search_task(search_query, search_type)
return [types.TextContent(type="text", text=search_results)] return [types.TextContent(type="text", text=search_results)]
@ -612,6 +657,10 @@ async def get_developer_rules() -> list:
async def fetch_rules_from_cognee() -> str: async def fetch_rules_from_cognee() -> str:
"""Collect all developer rules from Cognee""" """Collect all developer rules from Cognee"""
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
if cognee_client.use_api:
logger.warning("Developer rules retrieval is not available in API mode")
return "Developer rules retrieval is not available in API mode"
developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules") developer_rules = await get_existing_rules(rules_nodeset_name="coding_agent_rules")
return developer_rules return developer_rules
@ -651,16 +700,24 @@ async def list_data(dataset_id: str = None) -> list:
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
try: try:
user = await get_default_user()
output_lines = [] output_lines = []
if dataset_id: if dataset_id:
# List data for specific dataset # Detailed data listing for specific dataset is only available in direct mode
if cognee_client.use_api:
return [
types.TextContent(
type="text",
text="❌ Detailed data listing for specific datasets is not available in API mode.\nPlease use the API directly or use direct mode.",
)
]
from cognee.modules.users.methods import get_default_user
from cognee.modules.data.methods import get_dataset, get_dataset_data
logger.info(f"Listing data for dataset: {dataset_id}") logger.info(f"Listing data for dataset: {dataset_id}")
dataset_uuid = UUID(dataset_id) dataset_uuid = UUID(dataset_id)
user = await get_default_user()
# Get the dataset information
from cognee.modules.data.methods import get_dataset, get_dataset_data
dataset = await get_dataset(user.id, dataset_uuid) dataset = await get_dataset(user.id, dataset_uuid)
@ -689,11 +746,9 @@ async def list_data(dataset_id: str = None) -> list:
output_lines.append(" (No data items in this dataset)") output_lines.append(" (No data items in this dataset)")
else: else:
# List all datasets # List all datasets - works in both modes
logger.info("Listing all datasets") logger.info("Listing all datasets")
from cognee.modules.data.methods import get_datasets datasets = await cognee_client.list_datasets()
datasets = await get_datasets(user.id)
if not datasets: if not datasets:
return [ return [
@ -708,20 +763,21 @@ async def list_data(dataset_id: str = None) -> list:
output_lines.append("") output_lines.append("")
for i, dataset in enumerate(datasets, 1): for i, dataset in enumerate(datasets, 1):
# Get data count for each dataset # In API mode, dataset is a dict; in direct mode, it's formatted as dict
from cognee.modules.data.methods import get_dataset_data if isinstance(dataset, dict):
output_lines.append(f"{i}. 📁 {dataset.get('name', 'Unnamed')}")
data_items = await get_dataset_data(dataset.id) output_lines.append(f" Dataset ID: {dataset.get('id')}")
output_lines.append(f" Created: {dataset.get('created_at', 'N/A')}")
output_lines.append(f"{i}. 📁 {dataset.name}") else:
output_lines.append(f" Dataset ID: {dataset.id}") output_lines.append(f"{i}. 📁 {dataset.name}")
output_lines.append(f" Created: {dataset.created_at}") output_lines.append(f" Dataset ID: {dataset.id}")
output_lines.append(f" Data items: {len(data_items)}") output_lines.append(f" Created: {dataset.created_at}")
output_lines.append("") output_lines.append("")
output_lines.append("💡 To see data items in a specific dataset, use:") if not cognee_client.use_api:
output_lines.append(' list_data(dataset_id="your-dataset-id-here")') output_lines.append("💡 To see data items in a specific dataset, use:")
output_lines.append("") output_lines.append(' list_data(dataset_id="your-dataset-id-here")')
output_lines.append("")
output_lines.append("🗑️ To delete specific data, use:") output_lines.append("🗑️ To delete specific data, use:")
output_lines.append(' delete(data_id="data-id", dataset_id="dataset-id")') output_lines.append(' delete(data_id="data-id", dataset_id="dataset-id")')
@ -790,12 +846,9 @@ async def delete(data_id: str, dataset_id: str, mode: str = "soft") -> list:
data_uuid = UUID(data_id) data_uuid = UUID(data_id)
dataset_uuid = UUID(dataset_id) dataset_uuid = UUID(dataset_id)
# Get default user for the operation # Call the cognee delete function via client
user = await get_default_user() result = await cognee_client.delete(
data_id=data_uuid, dataset_id=dataset_uuid, mode=mode
# Call the cognee delete function
result = await cognee.delete(
data_id=data_uuid, dataset_id=dataset_uuid, mode=mode, user=user
) )
logger.info(f"Delete operation completed successfully: {result}") logger.info(f"Delete operation completed successfully: {result}")
@ -842,11 +895,21 @@ async def prune():
----- -----
- This operation cannot be undone. All memory data will be permanently deleted. - This operation cannot be undone. All memory data will be permanently deleted.
- The function prunes both data content (using prune_data) and system metadata (using prune_system) - The function prunes both data content (using prune_data) and system metadata (using prune_system)
- This operation is not available in API mode
""" """
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
await cognee.prune.prune_data() try:
await cognee.prune.prune_system(metadata=True) await cognee_client.prune_data()
return [types.TextContent(type="text", text="Pruned")] await cognee_client.prune_system(metadata=True)
return [types.TextContent(type="text", text="Pruned")]
except NotImplementedError:
error_msg = "❌ Prune operation is not available in API mode"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_msg = f"❌ Prune operation failed: {str(e)}"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
@mcp.tool() @mcp.tool()
@ -869,13 +932,26 @@ async def cognify_status():
- The function retrieves pipeline status specifically for the "cognify_pipeline" on the "main_dataset" - The function retrieves pipeline status specifically for the "cognify_pipeline" on the "main_dataset"
- Status information includes job progress, execution time, and completion status - Status information includes job progress, execution time, and completion status
- The status is returned in string format for easy reading - The status is returned in string format for easy reading
- This operation is not available in API mode
""" """
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
user = await get_default_user() try:
status = await get_pipeline_status( from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id
[await get_unique_dataset_id("main_dataset", user)], "cognify_pipeline" from cognee.modules.users.methods import get_default_user
)
return [types.TextContent(type="text", text=str(status))] user = await get_default_user()
status = await cognee_client.get_pipeline_status(
[await get_unique_dataset_id("main_dataset", user)], "cognify_pipeline"
)
return [types.TextContent(type="text", text=str(status))]
except NotImplementedError:
error_msg = "❌ Pipeline status is not available in API mode"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_msg = f"❌ Failed to get cognify status: {str(e)}"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
@mcp.tool() @mcp.tool()
@ -898,13 +974,26 @@ async def codify_status():
- The function retrieves pipeline status specifically for the "cognify_code_pipeline" on the "codebase" dataset - The function retrieves pipeline status specifically for the "cognify_code_pipeline" on the "codebase" dataset
- Status information includes job progress, execution time, and completion status - Status information includes job progress, execution time, and completion status
- The status is returned in string format for easy reading - The status is returned in string format for easy reading
- This operation is not available in API mode
""" """
with redirect_stdout(sys.stderr): with redirect_stdout(sys.stderr):
user = await get_default_user() try:
status = await get_pipeline_status( from cognee.modules.data.methods.get_unique_dataset_id import get_unique_dataset_id
[await get_unique_dataset_id("codebase", user)], "cognify_code_pipeline" from cognee.modules.users.methods import get_default_user
)
return [types.TextContent(type="text", text=str(status))] user = await get_default_user()
status = await cognee_client.get_pipeline_status(
[await get_unique_dataset_id("codebase", user)], "cognify_code_pipeline"
)
return [types.TextContent(type="text", text=str(status))]
except NotImplementedError:
error_msg = "❌ Pipeline status is not available in API mode"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_msg = f"❌ Failed to get codify status: {str(e)}"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
def node_to_string(node): def node_to_string(node):
@ -938,6 +1027,8 @@ def load_class(model_file, model_name):
async def main(): async def main():
global cognee_client
parser = argparse.ArgumentParser() parser = argparse.ArgumentParser()
parser.add_argument( parser.add_argument(
@ -981,12 +1072,30 @@ async def main():
help="Argument stops database migration from being attempted", help="Argument stops database migration from being attempted",
) )
# Cognee API connection options
parser.add_argument(
"--api-url",
default=None,
help="Base URL of a running Cognee FastAPI server (e.g., http://localhost:8000). "
"If provided, the MCP server will connect to the API instead of using cognee directly.",
)
parser.add_argument(
"--api-token",
default=None,
help="Authentication token for the API (optional, required if API has authentication enabled).",
)
args = parser.parse_args() args = parser.parse_args()
# Initialize the global CogneeClient
cognee_client = CogneeClient(api_url=args.api_url, api_token=args.api_token)
mcp.settings.host = args.host mcp.settings.host = args.host
mcp.settings.port = args.port mcp.settings.port = args.port
if not args.no_migration: # Skip migrations when in API mode (the API server handles its own database)
if not args.no_migration and not args.api_url:
# Run Alembic migrations from the main cognee directory where alembic.ini is located # Run Alembic migrations from the main cognee directory where alembic.ini is located
logger.info("Running database migrations...") logger.info("Running database migrations...")
migration_result = subprocess.run( migration_result = subprocess.run(
@ -1009,6 +1118,8 @@ async def main():
sys.exit(1) sys.exit(1)
logger.info("Database migrations done.") logger.info("Database migrations done.")
elif args.api_url:
logger.info("Skipping database migrations (using API mode)")
logger.info(f"Starting MCP server with transport: {args.transport}") logger.info(f"Starting MCP server with transport: {args.transport}")
if args.transport == "stdio": if args.transport == "stdio":

30
cognee-mcp/uv.lock generated
View file

@ -1,5 +1,5 @@
version = 1 version = 1
revision = 2 revision = 3
requires-python = ">=3.10" requires-python = ">=3.10"
resolution-markers = [ resolution-markers = [
"python_full_version >= '3.14' and platform_python_implementation != 'PyPy' and sys_platform != 'emscripten'", "python_full_version >= '3.14' and platform_python_implementation != 'PyPy' and sys_platform != 'emscripten'",
@ -737,6 +737,7 @@ source = { editable = "." }
dependencies = [ dependencies = [
{ name = "cognee", extra = ["codegraph", "docs", "gemini", "huggingface", "neo4j", "postgres"] }, { name = "cognee", extra = ["codegraph", "docs", "gemini", "huggingface", "neo4j", "postgres"] },
{ name = "fastmcp" }, { name = "fastmcp" },
{ name = "httpx" },
{ name = "mcp" }, { name = "mcp" },
{ name = "uv" }, { name = "uv" },
] ]
@ -750,6 +751,7 @@ dev = [
requires-dist = [ requires-dist = [
{ name = "cognee", extras = ["postgres", "codegraph", "gemini", "huggingface", "docs", "neo4j"], specifier = "==0.3.4" }, { name = "cognee", extras = ["postgres", "codegraph", "gemini", "huggingface", "docs", "neo4j"], specifier = "==0.3.4" },
{ name = "fastmcp", specifier = ">=2.10.0,<3.0.0" }, { name = "fastmcp", specifier = ">=2.10.0,<3.0.0" },
{ name = "httpx", specifier = ">=0.27.0,<1.0.0" },
{ name = "mcp", specifier = ">=1.12.0,<2.0.0" }, { name = "mcp", specifier = ">=1.12.0,<2.0.0" },
{ name = "uv", specifier = ">=0.6.3,<1.0.0" }, { name = "uv", specifier = ">=0.6.3,<1.0.0" },
] ]
@ -1026,7 +1028,7 @@ version = "3.24.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "attrs" }, { name = "attrs" },
{ name = "docstring-parser", marker = "python_full_version < '4.0'" }, { name = "docstring-parser", marker = "python_full_version < '4'" },
{ name = "rich" }, { name = "rich" },
{ name = "rich-rst" }, { name = "rich-rst" },
{ name = "typing-extensions", marker = "python_full_version < '3.11'" }, { name = "typing-extensions", marker = "python_full_version < '3.11'" },
@ -1309,17 +1311,17 @@ name = "fastembed"
version = "0.6.0" version = "0.6.0"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "huggingface-hub" }, { name = "huggingface-hub", marker = "python_full_version < '3.13'" },
{ name = "loguru" }, { name = "loguru", marker = "python_full_version < '3.13'" },
{ name = "mmh3" }, { name = "mmh3", marker = "python_full_version < '3.13'" },
{ name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" }, { name = "numpy", version = "2.2.6", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version < '3.11'" },
{ name = "numpy", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11'" }, { name = "numpy", version = "2.3.3", source = { registry = "https://pypi.org/simple" }, marker = "python_full_version >= '3.11' and python_full_version < '3.13'" },
{ name = "onnxruntime" }, { name = "onnxruntime", marker = "python_full_version < '3.13'" },
{ name = "pillow" }, { name = "pillow", marker = "python_full_version < '3.13'" },
{ name = "py-rust-stemmers" }, { name = "py-rust-stemmers", marker = "python_full_version < '3.13'" },
{ name = "requests" }, { name = "requests", marker = "python_full_version < '3.13'" },
{ name = "tokenizers" }, { name = "tokenizers", marker = "python_full_version < '3.13'" },
{ name = "tqdm" }, { name = "tqdm", marker = "python_full_version < '3.13'" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/c6/f4/036a656c605f63dc25f11284f60f69900a54a19c513e1ae60d21d6977e75/fastembed-0.6.0.tar.gz", hash = "sha256:5c9ead25f23449535b07243bbe1f370b820dcc77ec2931e61674e3fe7ff24733", size = 50731, upload-time = "2025-02-26T13:50:33.031Z" } sdist = { url = "https://files.pythonhosted.org/packages/c6/f4/036a656c605f63dc25f11284f60f69900a54a19c513e1ae60d21d6977e75/fastembed-0.6.0.tar.gz", hash = "sha256:5c9ead25f23449535b07243bbe1f370b820dcc77ec2931e61674e3fe7ff24733", size = 50731, upload-time = "2025-02-26T13:50:33.031Z" }
wheels = [ wheels = [
@ -2526,8 +2528,8 @@ name = "loguru"
version = "0.7.3" version = "0.7.3"
source = { registry = "https://pypi.org/simple" } source = { registry = "https://pypi.org/simple" }
dependencies = [ dependencies = [
{ name = "colorama", marker = "sys_platform == 'win32'" }, { name = "colorama", marker = "python_full_version < '3.13' and sys_platform == 'win32'" },
{ name = "win32-setctime", marker = "sys_platform == 'win32'" }, { name = "win32-setctime", marker = "python_full_version < '3.13' and sys_platform == 'win32'" },
] ]
sdist = { url = "https://files.pythonhosted.org/packages/3a/05/a1dae3dffd1116099471c643b8924f5aa6524411dc6c63fdae648c4f1aca/loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6", size = 63559, upload-time = "2024-12-06T11:20:56.608Z" } sdist = { url = "https://files.pythonhosted.org/packages/3a/05/a1dae3dffd1116099471c643b8924f5aa6524411dc6c63fdae648c4f1aca/loguru-0.7.3.tar.gz", hash = "sha256:19480589e77d47b8d85b2c827ad95d49bf31b0dcde16593892eb51dd18706eb6", size = 63559, upload-time = "2024-12-06T11:20:56.608Z" }
wheels = [ wheels = [

View file

@ -502,24 +502,48 @@ def start_ui(
if start_mcp: if start_mcp:
logger.info("Starting Cognee MCP server with Docker...") logger.info("Starting Cognee MCP server with Docker...")
cwd = os.getcwd()
env_file = os.path.join(cwd, ".env")
try: try:
image = "cognee/cognee-mcp:main" image = "cognee/cognee-mcp:feature-standalone-mcp" # TODO: change to "cognee/cognee-mcp:main" right before merging into main
subprocess.run(["docker", "pull", image], check=True) subprocess.run(["docker", "pull", image], check=True)
import uuid
container_name = f"cognee-mcp-{uuid.uuid4().hex[:8]}"
docker_cmd = [
"docker",
"run",
"--name",
container_name,
"-p",
f"{mcp_port}:8000",
"--rm",
"-e",
"TRANSPORT_MODE=sse",
]
if start_backend:
docker_cmd.extend(
[
"-e",
f"API_URL=http://localhost:{backend_port}",
]
)
logger.info(
f"Configuring MCP to connect to backend API at http://localhost:{backend_port}"
)
logger.info("(localhost will be auto-converted to host.docker.internal)")
else:
cwd = os.getcwd()
env_file = os.path.join(cwd, ".env")
docker_cmd.extend(["--env-file", env_file])
docker_cmd.append(
image
) # TODO: change to "cognee/cognee-mcp:main" right before merging into main
mcp_process = subprocess.Popen( mcp_process = subprocess.Popen(
[ docker_cmd,
"docker",
"run",
"-p",
f"{mcp_port}:8000",
"--rm",
"--env-file",
env_file,
"-e",
"TRANSPORT_MODE=sse",
"cognee/cognee-mcp:main",
],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE, stderr=subprocess.PIPE,
preexec_fn=os.setsid if hasattr(os, "setsid") else None, preexec_fn=os.setsid if hasattr(os, "setsid") else None,
@ -528,8 +552,13 @@ def start_ui(
_stream_process_output(mcp_process, "stdout", "[MCP]", "\033[34m") # Blue _stream_process_output(mcp_process, "stdout", "[MCP]", "\033[34m") # Blue
_stream_process_output(mcp_process, "stderr", "[MCP]", "\033[34m") # Blue _stream_process_output(mcp_process, "stderr", "[MCP]", "\033[34m") # Blue
pid_callback(mcp_process.pid) # Pass both PID and container name using a tuple
logger.info(f"✓ Cognee MCP server starting on http://127.0.0.1:{mcp_port}/sse") pid_callback((mcp_process.pid, container_name))
mode_info = "API mode" if start_backend else "direct mode"
logger.info(
f"✓ Cognee MCP server starting on http://127.0.0.1:{mcp_port}/sse ({mode_info})"
)
except Exception as e: except Exception as e:
logger.error(f"Failed to start MCP server with Docker: {str(e)}") logger.error(f"Failed to start MCP server with Docker: {str(e)}")
# Start backend server if requested # Start backend server if requested

View file

@ -175,19 +175,59 @@ def main() -> int:
# Handle UI flag # Handle UI flag
if hasattr(args, "start_ui") and args.start_ui: if hasattr(args, "start_ui") and args.start_ui:
spawned_pids = [] spawned_pids = []
docker_container = None
def signal_handler(signum, frame): def signal_handler(signum, frame):
"""Handle Ctrl+C and other termination signals""" """Handle Ctrl+C and other termination signals"""
nonlocal spawned_pids nonlocal spawned_pids, docker_container
fmt.echo("\nShutting down UI server...")
try:
fmt.echo("\nShutting down UI server...")
except (BrokenPipeError, OSError):
pass
# First, stop Docker container if running
if docker_container:
try:
result = subprocess.run(
["docker", "stop", docker_container],
capture_output=True,
timeout=10,
check=False,
)
try:
if result.returncode == 0:
fmt.success(f"✓ Docker container {docker_container} stopped.")
else:
fmt.warning(
f"Could not stop container {docker_container}: {result.stderr.decode()}"
)
except (BrokenPipeError, OSError):
pass
except subprocess.TimeoutExpired:
try:
fmt.warning(
f"Timeout stopping container {docker_container}, forcing removal..."
)
except (BrokenPipeError, OSError):
pass
subprocess.run(
["docker", "rm", "-f", docker_container], capture_output=True, check=False
)
except Exception:
pass
# Then, stop regular processes
for pid in spawned_pids: for pid in spawned_pids:
try: try:
if hasattr(os, "killpg"): if hasattr(os, "killpg"):
# Unix-like systems: Use process groups # Unix-like systems: Use process groups
pgid = os.getpgid(pid) pgid = os.getpgid(pid)
os.killpg(pgid, signal.SIGTERM) os.killpg(pgid, signal.SIGTERM)
fmt.success(f"✓ Process group {pgid} (PID {pid}) terminated.") try:
fmt.success(f"✓ Process group {pgid} (PID {pid}) terminated.")
except (BrokenPipeError, OSError):
pass
else: else:
# Windows: Use taskkill to terminate process and its children # Windows: Use taskkill to terminate process and its children
subprocess.run( subprocess.run(
@ -195,24 +235,35 @@ def main() -> int:
capture_output=True, capture_output=True,
check=False, check=False,
) )
fmt.success(f"✓ Process {pid} and its children terminated.") try:
except (OSError, ProcessLookupError, subprocess.SubprocessError) as e: fmt.success(f"✓ Process {pid} and its children terminated.")
fmt.warning(f"Could not terminate process {pid}: {e}") except (BrokenPipeError, OSError):
pass
except (OSError, ProcessLookupError, subprocess.SubprocessError):
pass
sys.exit(0) sys.exit(0)
signal.signal(signal.SIGINT, signal_handler) # Ctrl+C signal.signal(signal.SIGINT, signal_handler) # Ctrl+C
signal.signal(signal.SIGTERM, signal_handler) # Termination request signal.signal(signal.SIGTERM, signal_handler) # Termination request
if hasattr(signal, "SIGHUP"):
signal.signal(signal.SIGHUP, signal_handler)
try: try:
from cognee import start_ui from cognee import start_ui
fmt.echo("Starting cognee UI...") fmt.echo("Starting cognee UI...")
# Callback to capture PIDs of all spawned processes # Callback to capture PIDs and Docker container of all spawned processes
def pid_callback(pid): def pid_callback(pid_or_tuple):
nonlocal spawned_pids nonlocal spawned_pids, docker_container
spawned_pids.append(pid) # Handle both regular PIDs and (PID, container_name) tuples
if isinstance(pid_or_tuple, tuple):
pid, container_name = pid_or_tuple
spawned_pids.append(pid)
docker_container = container_name
else:
spawned_pids.append(pid_or_tuple)
frontend_port = 3000 frontend_port = 3000
start_backend, backend_port = True, 8000 start_backend, backend_port = True, 8000