Merge branch 'dev' into COG-2082

This commit is contained in:
Vasilije 2025-07-29 09:39:15 +02:00 committed by GitHub
commit 190c7eea88
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 4924 additions and 3918 deletions

View file

@ -17,10 +17,15 @@ secret-scan:
# Ignore by commit (if needed)
excluded-commits:
- '782bbb4'
- 'f857e07'
# Custom rules for template files
paths-ignore:
- path: '.env.template'
comment: 'Template file with placeholder values'
- path: '.github/workflows/search_db_tests.yml'
comment: 'Test workflow with test credentials'
comment: 'Test workflow with test credentials'
- path: 'docker-compose.yml'
comment: 'Development docker compose with test credentials (neo4j/pleaseletmein, postgres cognee/cognee)'
- path: 'deployment/helm/docker-compose-helm.yml'
comment: 'Helm deployment docker compose with test postgres credentials (cognee/cognee)'

1
.gitignore vendored
View file

@ -37,6 +37,7 @@ share/python-wheels/
.installed.cfg
*.egg
.python-version
cognee-mcp/.python-version
MANIFEST
# PyInstaller

View file

@ -0,0 +1,75 @@
"""kuzu-11-migration
Revision ID: b9274c27a25a
Revises: e4ebee1091e7
Create Date: 2025-07-24 17:11:52.174737
"""
import os
from typing import Sequence, Union
from cognee.infrastructure.databases.graph.kuzu.kuzu_migrate import (
kuzu_migration,
read_kuzu_storage_version,
)
import kuzu
# revision identifiers, used by Alembic.
revision: str = "b9274c27a25a"
down_revision: Union[str, None] = "e4ebee1091e7"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
# This migration is only for multi-user Cognee mode
if os.getenv("ENABLE_BACKEND_ACCESS_CONTROL", "false").lower() == "true":
from cognee.base_config import get_base_config
base_config = get_base_config()
databases_root = os.path.join(base_config.system_root_directory, "databases")
if not os.path.isdir(databases_root):
raise FileNotFoundError(f"Directory not found: {databases_root}")
for current_path, dirnames, _ in os.walk(databases_root):
# If file is kuzu graph database
if ".pkl" in current_path[-4:]:
kuzu_db_version = read_kuzu_storage_version(current_path)
if (
kuzu_db_version == "0.9.0" or kuzu_db_version == "0.8.2"
) and kuzu_db_version != kuzu.__version__:
# Try to migrate kuzu database to latest version
kuzu_migration(
new_db=current_path + "_new",
old_db=current_path,
new_version=kuzu.__version__,
old_version=kuzu_db_version,
overwrite=True,
)
else:
from cognee.infrastructure.databases.graph import get_graph_config
graph_config = get_graph_config()
if graph_config.graph_database_provider.lower() == "kuzu":
if os.path.exists(graph_config.graph_file_path):
kuzu_db_version = read_kuzu_storage_version(graph_config.graph_file_path)
if (
kuzu_db_version == "0.9.0" or kuzu_db_version == "0.8.2"
) and kuzu_db_version != kuzu.__version__:
# Try to migrate kuzu database to latest version
kuzu_migration(
new_db=graph_config.graph_file_path + "_new",
old_db=graph_config.graph_file_path,
new_version=kuzu.__version__,
old_version=kuzu_db_version,
overwrite=True,
)
def downgrade() -> None:
# To downgrade you will have to manually change the backup old kuzu graph databases
# stored in the user folder to its previous name and remove the new kuzu graph
# database that replaced it
pass

View file

@ -0,0 +1,140 @@
"""Expand data model info
Revision ID: e4ebee1091e7
Revises: ab7e313804ae
Create Date: 2025-07-24 13:21:30.738486
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects import postgresql
# revision identifiers, used by Alembic.
revision: str = "e4ebee1091e7"
down_revision: Union[str, None] = "ab7e313804ae"
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def _get_column(inspector, table, name, schema=None):
for col in inspector.get_columns(table, schema=schema):
if col["name"] == name:
return col
return None
def _index_exists(inspector, table, name, schema=None):
return any(ix["name"] == name for ix in inspector.get_indexes(table, schema=schema))
def upgrade() -> None:
TABLES_TO_DROP = [
"file_metadata",
"_dlt_loads",
"_dlt_version",
"_dlt_pipeline_state",
]
conn = op.get_bind()
insp = sa.inspect(conn)
existing = set(insp.get_table_names())
for tbl in TABLES_TO_DROP:
if tbl in existing:
op.drop_table(tbl)
DATA_TABLE = "data"
DATA_TENANT_COL = "tenant_id"
DATA_SIZE_COL = "data_size"
DATA_TENANT_IDX = "ix_data_tenant_id"
# --- tenant_id ---
col = _get_column(insp, DATA_TABLE, DATA_TENANT_COL)
if col is None:
op.add_column(
DATA_TABLE,
sa.Column(DATA_TENANT_COL, postgresql.UUID(as_uuid=True), nullable=True),
)
else:
# Column exists fix nullability if needed
if col.get("nullable", True) is False:
op.alter_column(
DATA_TABLE,
DATA_TENANT_COL,
existing_type=postgresql.UUID(as_uuid=True),
nullable=True,
)
# --- data_size ---
col = _get_column(insp, DATA_TABLE, DATA_SIZE_COL)
if col is None:
op.add_column(DATA_TABLE, sa.Column(DATA_SIZE_COL, sa.Integer(), nullable=True))
else:
# If you also need to change nullability for data_size, do it here
if col.get("nullable", True) is False:
op.alter_column(
DATA_TABLE,
DATA_SIZE_COL,
existing_type=sa.Integer(),
nullable=True,
)
# --- index on tenant_id ---
if not _index_exists(insp, DATA_TABLE, DATA_TENANT_IDX):
op.create_index(DATA_TENANT_IDX, DATA_TABLE, [DATA_TENANT_COL], unique=False)
def downgrade() -> None:
op.drop_index(op.f("ix_data_tenant_id"), table_name="data")
op.drop_column("data", "data_size")
op.drop_column("data", "tenant_id")
op.create_table(
"_dlt_pipeline_state",
sa.Column("version", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column("engine_version", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column("pipeline_name", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("state", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column(
"created_at", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False
),
sa.Column("version_hash", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("_dlt_load_id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("_dlt_id", sa.VARCHAR(length=128), autoincrement=False, nullable=False),
)
op.create_table(
"_dlt_version",
sa.Column("version", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column("engine_version", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"inserted_at", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False
),
sa.Column("schema_name", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("version_hash", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("schema", sa.TEXT(), autoincrement=False, nullable=False),
)
op.create_table(
"_dlt_loads",
sa.Column("load_id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("schema_name", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("status", sa.BIGINT(), autoincrement=False, nullable=False),
sa.Column(
"inserted_at", postgresql.TIMESTAMP(timezone=True), autoincrement=False, nullable=False
),
sa.Column("schema_version_hash", sa.TEXT(), autoincrement=False, nullable=True),
)
op.create_table(
"file_metadata",
sa.Column("id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("name", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("file_path", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("extension", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("mime_type", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("content_hash", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("owner_id", sa.TEXT(), autoincrement=False, nullable=True),
sa.Column("_dlt_load_id", sa.TEXT(), autoincrement=False, nullable=False),
sa.Column("_dlt_id", sa.VARCHAR(length=128), autoincrement=False, nullable=False),
sa.Column("node_set", sa.TEXT(), autoincrement=False, nullable=True),
)

View file

@ -1 +0,0 @@
3.11.5

View file

@ -37,7 +37,7 @@ Build memory for Agents and query from any client that speaks MCP  in your t
## ✨ Features
- SSE & stdio transports choose realtime streaming --transport sse or the classic stdio pipe
- Multiple transports choose Streamable HTTP --transport http (recommended for web deployments), SSE --transport sse (realtime streaming), or stdio (classic pipe, default)
- Integrated logging all actions written to a rotating file (see get_log_file_location()) and mirrored to console in dev
- Local file ingestion feed .md, source files, Cursor rulesets, etc. straight from disk
- Background pipelines longrunning cognify & codify jobs spawn offthread; check progress with status tools
@ -80,6 +80,10 @@ Please refer to our documentation [here](https://docs.cognee.ai/how-to-guides/de
```
python src/server.py --transport sse
```
or run with Streamable HTTP transport (recommended for web deployments)
```
python src/server.py --transport http --host 127.0.0.1 --port 8000 --path /mcp
```
You can do more advanced configurations by creating .env file using our <a href="https://github.com/topoteretes/cognee/blob/main/.env.template">template.</a>
To use different LLM providers / database configurations, and for more info check out our <a href="https://docs.cognee.ai">documentation</a>.
@ -98,12 +102,21 @@ If youd rather run cognee-mcp in a container, you have two options:
```
3. Run it:
```bash
docker run --env-file ./.env -p 8000:8000 --rm -it cognee/cognee-mcp:main
# For HTTP transport (recommended for web deployments)
docker run --env-file ./.env -p 8000:8000 --rm -it cognee/cognee-mcp:main --transport http
# For SSE transport
docker run --env-file ./.env -p 8000:8000 --rm -it cognee/cognee-mcp:main --transport sse
# For stdio transport (default)
docker run --env-file ./.env --rm -it cognee/cognee-mcp:main
```
2. **Pull from Docker Hub** (no build required):
```bash
# With your .env file
docker run --env-file ./.env -p 8000:8000 --rm -it cognee/cognee-mcp:main
# With HTTP transport (recommended for web deployments)
docker run --env-file ./.env -p 8000:8000 --rm -it cognee/cognee-mcp:main --transport http
# With SSE transport
docker run --env-file ./.env -p 8000:8000 --rm -it cognee/cognee-mcp:main --transport sse
# With stdio transport (default)
docker run --env-file ./.env --rm -it cognee/cognee-mcp:main
## 💻 Basic Usage
@ -113,15 +126,34 @@ The MCP server exposes its functionality through tools. Call them from any MCP c
### Available Tools
- cognify: Turns your data into a structured knowledge graph and stores it in memory
- **cognify**: Turns your data into a structured knowledge graph and stores it in memory
- codify: Analyse a code repository, build a code graph, stores it in memory
- **codify**: Analyse a code repository, build a code graph, stores it in memory
- search: Query memory supports GRAPH_COMPLETION, RAG_COMPLETION, CODE, CHUNKS, INSIGHTS
- **search**: Query memory supports GRAPH_COMPLETION, RAG_COMPLETION, CODE, CHUNKS, INSIGHTS
- prune: Reset cognee for a fresh start
- **list_data**: List all datasets and their data items with IDs for deletion operations
- cognify_status / codify_status: Track pipeline progress
- **delete**: Delete specific data from a dataset (supports soft/hard deletion modes)
- **prune**: Reset cognee for a fresh start (removes all data)
- **cognify_status / codify_status**: Track pipeline progress
**Data Management Examples:**
```bash
# List all available datasets and data items
list_data()
# List data items in a specific dataset
list_data(dataset_id="your-dataset-id-here")
# Delete specific data (soft deletion - safer, preserves shared entities)
delete(data_id="data-uuid", dataset_id="dataset-uuid", mode="soft")
# Delete specific data (hard deletion - removes orphaned entities)
delete(data_id="data-uuid", dataset_id="dataset-uuid", mode="hard")
```
Remember  use the CODE search type to query your code graph. For huge repos, run codify on modules incrementally and cache results.

View file

@ -8,6 +8,12 @@ echo "Environment: $ENVIRONMENT"
TRANSPORT_MODE=${TRANSPORT_MODE:-"stdio"}
echo "Transport mode: $TRANSPORT_MODE"
# Set default ports if not specified
DEBUG_PORT=${DEBUG_PORT:-5678}
HTTP_PORT=${HTTP_PORT:-8000}
echo "Debug port: $DEBUG_PORT"
echo "HTTP port: $HTTP_PORT"
# Run Alembic migrations with proper error handling.
# Note on UserAlreadyExists error handling:
# During database migrations, we attempt to create a default user. If this user
@ -42,13 +48,17 @@ if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then
if [ "$DEBUG" = "true" ]; then
echo "Waiting for the debugger to attach..."
if [ "$TRANSPORT_MODE" = "sse" ]; then
exec python -m debugpy --wait-for-client --listen 0.0.0.0:5678 -m cognee --transport sse
exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee --transport sse
elif [ "$TRANSPORT_MODE" = "http" ]; then
exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee --transport http --host 0.0.0.0 --port $HTTP_PORT
else
exec python -m debugpy --wait-for-client --listen 0.0.0.0:5678 -m cognee --transport stdio
exec python -m debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m cognee --transport stdio
fi
else
if [ "$TRANSPORT_MODE" = "sse" ]; then
exec cognee --transport sse
elif [ "$TRANSPORT_MODE" = "http" ]; then
exec cognee --transport http --host 0.0.0.0 --port $HTTP_PORT
else
exec cognee --transport stdio
fi
@ -56,6 +66,8 @@ if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then
else
if [ "$TRANSPORT_MODE" = "sse" ]; then
exec cognee --transport sse
elif [ "$TRANSPORT_MODE" = "http" ]; then
exec cognee --transport http --host 0.0.0.0 --port $HTTP_PORT
else
exec cognee --transport stdio
fi

View file

@ -7,10 +7,10 @@ requires-python = ">=3.10"
dependencies = [
# For local cognee repo usage remove comment bellow and add absolute path to cognee. Then run `uv sync --reinstall` in the mcp folder on local cognee changes.
#"cognee[postgres,codegraph,gemini,huggingface,docs,neo4j] @ file:/Users/<username>/Desktop/cognee",
# "cognee[postgres,codegraph,gemini,huggingface,docs,neo4j] @ file:/Users/vasilije/Projects/tiktok/cognee",
"cognee[postgres,codegraph,gemini,huggingface,docs,neo4j]>=0.2.0,<1.0.0",
"fastmcp>=1.0,<2.0.0",
"mcp>=1.11.0,<2.0.0",
"fastmcp>=2.10.0,<3.0.0",
"mcp>=1.12.0,<2.0.0",
"uv>=0.6.3,<1.0.0",
]

View file

@ -4,6 +4,8 @@ import sys
import argparse
import cognee
import asyncio
import subprocess
from pathlib import Path
from cognee.shared.logging_utils import get_logger, setup_logging, get_log_file_location
import importlib.util
@ -378,7 +380,7 @@ async def search(search_query: str, search_type: str) -> list:
elif (
search_type.upper() == "GRAPH_COMPLETION" or search_type.upper() == "RAG_COMPLETION"
):
return search_results[0]
return str(search_results[0])
elif search_type.upper() == "CHUNKS":
return str(search_results)
elif search_type.upper() == "INSIGHTS":
@ -426,6 +428,209 @@ async def get_developer_rules() -> list:
return [types.TextContent(type="text", text=rules_text)]
@mcp.tool()
async def list_data(dataset_id: str = None) -> list:
"""
List all datasets and their data items with IDs for deletion operations.
This function helps users identify data IDs and dataset IDs that can be used
with the delete tool. It provides a comprehensive view of available data.
Parameters
----------
dataset_id : str, optional
If provided, only list data items from this specific dataset.
If None, lists all datasets and their data items.
Should be a valid UUID string.
Returns
-------
list
A list containing a single TextContent object with formatted information
about datasets and data items, including their IDs for deletion.
Notes
-----
- Use this tool to identify data_id and dataset_id values for the delete tool
- The output includes both dataset information and individual data items
- UUIDs are displayed in a format ready for use with other tools
"""
from uuid import UUID
with redirect_stdout(sys.stderr):
try:
user = await get_default_user()
output_lines = []
if dataset_id:
# List data for specific dataset
logger.info(f"Listing data for dataset: {dataset_id}")
dataset_uuid = UUID(dataset_id)
# Get the dataset information
from cognee.modules.data.methods import get_dataset, get_dataset_data
dataset = await get_dataset(user.id, dataset_uuid)
if not dataset:
return [
types.TextContent(type="text", text=f"❌ Dataset not found: {dataset_id}")
]
# Get data items in the dataset
data_items = await get_dataset_data(dataset.id)
output_lines.append(f"📁 Dataset: {dataset.name}")
output_lines.append(f" ID: {dataset.id}")
output_lines.append(f" Created: {dataset.created_at}")
output_lines.append(f" Data items: {len(data_items)}")
output_lines.append("")
if data_items:
for i, data_item in enumerate(data_items, 1):
output_lines.append(f" 📄 Data item #{i}:")
output_lines.append(f" Data ID: {data_item.id}")
output_lines.append(f" Name: {data_item.name or 'Unnamed'}")
output_lines.append(f" Created: {data_item.created_at}")
output_lines.append("")
else:
output_lines.append(" (No data items in this dataset)")
else:
# List all datasets
logger.info("Listing all datasets")
from cognee.modules.data.methods import get_datasets
datasets = await get_datasets(user.id)
if not datasets:
return [
types.TextContent(
type="text",
text="📂 No datasets found.\nUse the cognify tool to create your first dataset!",
)
]
output_lines.append("📂 Available Datasets:")
output_lines.append("=" * 50)
output_lines.append("")
for i, dataset in enumerate(datasets, 1):
# Get data count for each dataset
from cognee.modules.data.methods import get_dataset_data
data_items = await get_dataset_data(dataset.id)
output_lines.append(f"{i}. 📁 {dataset.name}")
output_lines.append(f" Dataset ID: {dataset.id}")
output_lines.append(f" Created: {dataset.created_at}")
output_lines.append(f" Data items: {len(data_items)}")
output_lines.append("")
output_lines.append("💡 To see data items in a specific dataset, use:")
output_lines.append(' list_data(dataset_id="your-dataset-id-here")')
output_lines.append("")
output_lines.append("🗑️ To delete specific data, use:")
output_lines.append(' delete(data_id="data-id", dataset_id="dataset-id")')
result_text = "\n".join(output_lines)
logger.info("List data operation completed successfully")
return [types.TextContent(type="text", text=result_text)]
except ValueError as e:
error_msg = f"❌ Invalid UUID format: {str(e)}"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
error_msg = f"❌ Failed to list data: {str(e)}"
logger.error(f"List data error: {str(e)}")
return [types.TextContent(type="text", text=error_msg)]
@mcp.tool()
async def delete(data_id: str, dataset_id: str, mode: str = "soft") -> list:
"""
Delete specific data from a dataset in the Cognee knowledge graph.
This function removes a specific data item from a dataset while keeping the
dataset itself intact. It supports both soft and hard deletion modes.
Parameters
----------
data_id : str
The UUID of the data item to delete from the knowledge graph.
This should be a valid UUID string identifying the specific data item.
dataset_id : str
The UUID of the dataset containing the data to be deleted.
This should be a valid UUID string identifying the dataset.
mode : str, optional
The deletion mode to use. Options are:
- "soft" (default): Removes the data but keeps related entities that might be shared
- "hard": Also removes degree-one entity nodes that become orphaned after deletion
Default is "soft" for safer deletion that preserves shared knowledge.
Returns
-------
list
A list containing a single TextContent object with the deletion results,
including status, deleted node counts, and confirmation details.
Notes
-----
- This operation cannot be undone. The specified data will be permanently removed.
- Hard mode may remove additional entity nodes that become orphaned
- The function provides detailed feedback about what was deleted
- Use this for targeted deletion instead of the prune tool which removes everything
"""
from uuid import UUID
with redirect_stdout(sys.stderr):
try:
logger.info(
f"Starting delete operation for data_id: {data_id}, dataset_id: {dataset_id}, mode: {mode}"
)
# Convert string UUIDs to UUID objects
data_uuid = UUID(data_id)
dataset_uuid = UUID(dataset_id)
# Get default user for the operation
user = await get_default_user()
# Call the cognee delete function
result = await cognee.delete(
data_id=data_uuid, dataset_id=dataset_uuid, mode=mode, user=user
)
logger.info(f"Delete operation completed successfully: {result}")
# Format the result for MCP response
formatted_result = json.dumps(result, indent=2, cls=JSONEncoder)
return [
types.TextContent(
type="text",
text=f"✅ Delete operation completed successfully!\n\n{formatted_result}",
)
]
except ValueError as e:
# Handle UUID parsing errors
error_msg = f"❌ Invalid UUID format: {str(e)}"
logger.error(error_msg)
return [types.TextContent(type="text", text=error_msg)]
except Exception as e:
# Handle all other errors (DocumentNotFoundError, DatasetNotFoundError, etc.)
error_msg = f"❌ Delete operation failed: {str(e)}"
logger.error(f"Delete operation error: {str(e)}")
return [types.TextContent(type="text", text=error_msg)]
@mcp.tool()
async def prune():
"""
@ -545,21 +750,74 @@ async def main():
parser.add_argument(
"--transport",
choices=["sse", "stdio"],
choices=["sse", "stdio", "http"],
default="stdio",
help="Transport to use for communication with the client. (default: stdio)",
)
# HTTP transport options
parser.add_argument(
"--host",
default="127.0.0.1",
help="Host to bind the HTTP server to (default: 127.0.0.1)",
)
parser.add_argument(
"--port",
type=int,
default=8000,
help="Port to bind the HTTP server to (default: 8000)",
)
parser.add_argument(
"--path",
default="/mcp",
help="Path for the MCP HTTP endpoint (default: /mcp)",
)
parser.add_argument(
"--log-level",
default="info",
choices=["debug", "info", "warning", "error"],
help="Log level for the HTTP server (default: info)",
)
args = parser.parse_args()
# Run Alembic migrations from the main cognee directory where alembic.ini is located
print("Running database migrations...")
migration_result = subprocess.run(
["python", "-m", "alembic", "upgrade", "head"],
capture_output=True,
text=True,
cwd=Path(__file__).resolve().parent.parent.parent,
)
if migration_result.returncode != 0:
migration_output = migration_result.stderr + migration_result.stdout
# Check for the expected UserAlreadyExists error (which is not critical)
if (
"UserAlreadyExists" in migration_output
or "User default_user@example.com already exists" in migration_output
):
print("Warning: Default user already exists, continuing startup...")
else:
print(f"Migration failed with unexpected error: {migration_output}")
sys.exit(1)
print("Database migrations done.")
logger.info(f"Starting MCP server with transport: {args.transport}")
if args.transport == "stdio":
await mcp.run_stdio_async()
elif args.transport == "sse":
logger.info(
f"Running MCP server with SSE transport on {mcp.settings.host}:{mcp.settings.port}"
)
logger.info(f"Running MCP server with SSE transport on {args.host}:{args.port}")
await mcp.run_sse_async()
elif args.transport == "http":
logger.info(
f"Running MCP server with Streamable HTTP transport on {args.host}:{args.port}{args.path}"
)
await mcp.run_streamable_http_async()
if __name__ == "__main__":

View file

@ -4,6 +4,17 @@ Test client for Cognee MCP Server functionality.
This script tests all the tools and functions available in the Cognee MCP server,
including cognify, codify, search, prune, status checks, and utility functions.
Usage:
# Set your OpenAI API key first
export OPENAI_API_KEY="your-api-key-here"
# Run the test client
python src/test_client.py
# Or use LLM_API_KEY instead of OPENAI_API_KEY
export LLM_API_KEY="your-api-key-here"
python src/test_client.py
"""
import asyncio
@ -13,28 +24,17 @@ import time
from contextlib import asynccontextmanager
from cognee.shared.logging_utils import setup_logging
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from cognee.modules.pipelines.models.PipelineRun import PipelineRunStatus
from cognee.infrastructure.databases.exceptions import DatabaseNotCreatedError
from src.server import (
cognify,
codify,
search,
prune,
cognify_status,
codify_status,
cognee_add_developer_rules,
node_to_string,
retrieved_edges_to_string,
load_class,
)
# Import MCP client functionality for server testing
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# Set timeout for cognify/codify to complete in
TIMEOUT = 5 * 60 # 5 min in seconds
@ -50,6 +50,15 @@ class CogneeTestClient:
"""Setup test environment."""
print("🔧 Setting up test environment...")
# Check for required API keys
api_key = os.environ.get("OPENAI_API_KEY") or os.environ.get("LLM_API_KEY")
if not api_key:
print("⚠️ Warning: No OPENAI_API_KEY or LLM_API_KEY found in environment.")
print(" Some tests may fail without proper LLM API configuration.")
print(" Set OPENAI_API_KEY environment variable for full functionality.")
else:
print(f"✅ API key configured (key ending in: ...{api_key[-4:]})")
# Create temporary test files
self.test_data_dir = tempfile.mkdtemp(prefix="cognee_test_")
@ -113,11 +122,15 @@ DEBUG = True
# Get the path to the server script
server_script = os.path.join(os.path.dirname(__file__), "server.py")
# Pass current environment variables to the server process
# This ensures OpenAI API key and other config is available
server_env = os.environ.copy()
# Start the server process
server_params = StdioServerParameters(
command="python",
args=[server_script, "--transport", "stdio"],
env=None,
env=server_env,
)
async with stdio_client(server_params) as (read, write):
@ -144,6 +157,8 @@ DEBUG = True
"cognify_status",
"codify_status",
"cognee_add_developer_rules",
"list_data",
"delete",
}
available_tools = {tool.name for tool in tools_result.tools}
@ -164,16 +179,17 @@ DEBUG = True
print(f"❌ MCP server integration test failed: {e}")
async def test_prune(self):
"""Test the prune functionality."""
"""Test the prune functionality using MCP client."""
print("\n🧪 Testing prune functionality...")
try:
result = await prune()
self.test_results["prune"] = {
"status": "PASS",
"result": result,
"message": "Prune executed successfully",
}
print("✅ Prune test passed")
async with self.mcp_server_session() as session:
result = await session.call_tool("prune", arguments={})
self.test_results["prune"] = {
"status": "PASS",
"result": result,
"message": "Prune executed successfully",
}
print("✅ Prune test passed")
except Exception as e:
self.test_results["prune"] = {
"status": "FAIL",
@ -184,34 +200,44 @@ DEBUG = True
raise e
async def test_cognify(self, test_text, test_name):
"""Test the cognify functionality."""
"""Test the cognify functionality using MCP client."""
print("\n🧪 Testing cognify functionality...")
try:
# Test with simple text
cognify_result = await cognify(test_text)
# Test with simple text using MCP client
async with self.mcp_server_session() as session:
cognify_result = await session.call_tool("cognify", arguments={"data": test_text})
start = time.time() # mark the start
while True:
try:
# Wait a moment
await asyncio.sleep(5)
start = time.time() # mark the start
while True:
try:
# Wait a moment
await asyncio.sleep(5)
# Check if cognify processing is finished
status_result = await cognify_status()
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_result[0].text:
break
elif time.time() - start > TIMEOUT:
raise TimeoutError("Cognify did not complete in 5min")
except DatabaseNotCreatedError:
if time.time() - start > TIMEOUT:
raise TimeoutError("Database was not created in 5min")
# Check if cognify processing is finished
status_result = await session.call_tool("cognify_status", arguments={})
if hasattr(status_result, "content") and status_result.content:
status_text = (
status_result.content[0].text
if status_result.content
else str(status_result)
)
else:
status_text = str(status_result)
self.test_results[test_name] = {
"status": "PASS",
"result": cognify_result,
"message": f"{test_name} executed successfully",
}
print(f"{test_name} test passed")
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text:
break
elif time.time() - start > TIMEOUT:
raise TimeoutError("Cognify did not complete in 5min")
except DatabaseNotCreatedError:
if time.time() - start > TIMEOUT:
raise TimeoutError("Database was not created in 5min")
self.test_results[test_name] = {
"status": "PASS",
"result": cognify_result,
"message": f"{test_name} executed successfully",
}
print(f"{test_name} test passed")
except Exception as e:
self.test_results[test_name] = {
@ -222,33 +248,45 @@ DEBUG = True
print(f"{test_name} test failed: {e}")
async def test_codify(self):
"""Test the codify functionality."""
"""Test the codify functionality using MCP client."""
print("\n🧪 Testing codify functionality...")
try:
codify_result = await codify(self.test_repo_dir)
async with self.mcp_server_session() as session:
codify_result = await session.call_tool(
"codify", arguments={"repo_path": self.test_repo_dir}
)
start = time.time() # mark the start
while True:
try:
# Wait a moment
await asyncio.sleep(5)
start = time.time() # mark the start
while True:
try:
# Wait a moment
await asyncio.sleep(5)
# Check if codify processing is finished
status_result = await codify_status()
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_result[0].text:
break
elif time.time() - start > TIMEOUT:
raise TimeoutError("Codify did not complete in 5min")
except DatabaseNotCreatedError:
if time.time() - start > TIMEOUT:
raise TimeoutError("Database was not created in 5min")
# Check if codify processing is finished
status_result = await session.call_tool("codify_status", arguments={})
if hasattr(status_result, "content") and status_result.content:
status_text = (
status_result.content[0].text
if status_result.content
else str(status_result)
)
else:
status_text = str(status_result)
self.test_results["codify"] = {
"status": "PASS",
"result": codify_result,
"message": "Codify executed successfully",
}
print("✅ Codify test passed")
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text:
break
elif time.time() - start > TIMEOUT:
raise TimeoutError("Codify did not complete in 5min")
except DatabaseNotCreatedError:
if time.time() - start > TIMEOUT:
raise TimeoutError("Database was not created in 5min")
self.test_results["codify"] = {
"status": "PASS",
"result": codify_result,
"message": "Codify executed successfully",
}
print("✅ Codify test passed")
except Exception as e:
self.test_results["codify"] = {
@ -259,33 +297,47 @@ DEBUG = True
print(f"❌ Codify test failed: {e}")
async def test_cognee_add_developer_rules(self):
"""Test the cognee_add_developer_rules functionality."""
"""Test the cognee_add_developer_rules functionality using MCP client."""
print("\n🧪 Testing cognee_add_developer_rules functionality...")
try:
result = await cognee_add_developer_rules(base_path=self.test_data_dir)
async with self.mcp_server_session() as session:
result = await session.call_tool(
"cognee_add_developer_rules", arguments={"base_path": self.test_data_dir}
)
start = time.time() # mark the start
while True:
try:
# Wait a moment
await asyncio.sleep(5)
start = time.time() # mark the start
while True:
try:
# Wait a moment
await asyncio.sleep(5)
# Check if developer rule cognify processing is finished
status_result = await cognify_status()
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_result[0].text:
break
elif time.time() - start > TIMEOUT:
raise TimeoutError("Cognify of developer rules did not complete in 5min")
except DatabaseNotCreatedError:
if time.time() - start > TIMEOUT:
raise TimeoutError("Database was not created in 5min")
# Check if developer rule cognify processing is finished
status_result = await session.call_tool("cognify_status", arguments={})
if hasattr(status_result, "content") and status_result.content:
status_text = (
status_result.content[0].text
if status_result.content
else str(status_result)
)
else:
status_text = str(status_result)
self.test_results["cognee_add_developer_rules"] = {
"status": "PASS",
"result": result,
"message": "Developer rules addition executed successfully",
}
print("✅ Developer rules test passed")
if str(PipelineRunStatus.DATASET_PROCESSING_COMPLETED) in status_text:
break
elif time.time() - start > TIMEOUT:
raise TimeoutError(
"Cognify of developer rules did not complete in 5min"
)
except DatabaseNotCreatedError:
if time.time() - start > TIMEOUT:
raise TimeoutError("Database was not created in 5min")
self.test_results["cognee_add_developer_rules"] = {
"status": "PASS",
"result": result,
"message": "Developer rules addition executed successfully",
}
print("✅ Developer rules test passed")
except Exception as e:
self.test_results["cognee_add_developer_rules"] = {
@ -296,7 +348,7 @@ DEBUG = True
print(f"❌ Developer rules test failed: {e}")
async def test_search_functionality(self):
"""Test the search functionality with different search types."""
"""Test the search functionality with different search types using MCP client."""
print("\n🧪 Testing search functionality...")
search_query = "What is artificial intelligence?"
@ -310,13 +362,17 @@ DEBUG = True
if search_type in [SearchType.NATURAL_LANGUAGE, SearchType.CYPHER]:
break
try:
result = await search(search_query, search_type.value)
self.test_results[f"search_{search_type}"] = {
"status": "PASS",
"result": result,
"message": f"Search with {search_type} successful",
}
print(f"✅ Search {search_type} test passed")
async with self.mcp_server_session() as session:
result = await session.call_tool(
"search",
arguments={"search_query": search_query, "search_type": search_type.value},
)
self.test_results[f"search_{search_type}"] = {
"status": "PASS",
"result": result,
"message": f"Search with {search_type} successful",
}
print(f"✅ Search {search_type} test passed")
except Exception as e:
self.test_results[f"search_{search_type}"] = {
"status": "FAIL",
@ -325,6 +381,168 @@ DEBUG = True
}
print(f"❌ Search {search_type} test failed: {e}")
async def test_list_data(self):
"""Test the list_data functionality."""
print("\n🧪 Testing list_data functionality...")
try:
async with self.mcp_server_session() as session:
# Test listing all datasets
result = await session.call_tool("list_data", arguments={})
if result.content and len(result.content) > 0:
content = result.content[0].text
# Check if the output contains expected elements
if "Available Datasets:" in content or "No datasets found" in content:
self.test_results["list_data_all"] = {
"status": "PASS",
"result": content[:200] + "..." if len(content) > 200 else content,
"message": "list_data (all datasets) successful",
}
print("✅ list_data (all datasets) test passed")
# If there are datasets, try to list data for the first one
if "Dataset ID:" in content:
# Extract the first dataset ID from the output
lines = content.split("\n")
dataset_id = None
for line in lines:
if "Dataset ID:" in line:
dataset_id = line.split("Dataset ID:")[1].strip()
break
if dataset_id:
# Test listing data for specific dataset
specific_result = await session.call_tool(
"list_data", arguments={"dataset_id": dataset_id}
)
if specific_result.content and len(specific_result.content) > 0:
specific_content = specific_result.content[0].text
if "Dataset:" in specific_content:
self.test_results["list_data_specific"] = {
"status": "PASS",
"result": specific_content[:200] + "..."
if len(specific_content) > 200
else specific_content,
"message": "list_data (specific dataset) successful",
}
print("✅ list_data (specific dataset) test passed")
else:
raise Exception(
"Specific dataset listing returned unexpected format"
)
else:
raise Exception("Specific dataset listing returned no content")
else:
raise Exception("list_data returned unexpected format")
else:
raise Exception("list_data returned no content")
except Exception as e:
self.test_results["list_data"] = {
"status": "FAIL",
"error": str(e),
"message": "list_data test failed",
}
print(f"❌ list_data test failed: {e}")
async def test_delete(self):
"""Test the delete functionality."""
print("\n🧪 Testing delete functionality...")
try:
async with self.mcp_server_session() as session:
# First, let's get available data to delete
list_result = await session.call_tool("list_data", arguments={})
if not (list_result.content and len(list_result.content) > 0):
raise Exception("No data available for delete test - list_data returned empty")
content = list_result.content[0].text
# Look for data IDs and dataset IDs in the content
lines = content.split("\n")
dataset_id = None
data_id = None
for line in lines:
if "Dataset ID:" in line:
dataset_id = line.split("Dataset ID:")[1].strip()
elif "Data ID:" in line:
data_id = line.split("Data ID:")[1].strip()
break # Get the first data item
if dataset_id and data_id:
# Test soft delete (default)
delete_result = await session.call_tool(
"delete",
arguments={"data_id": data_id, "dataset_id": dataset_id, "mode": "soft"},
)
if delete_result.content and len(delete_result.content) > 0:
delete_content = delete_result.content[0].text
if "Delete operation completed successfully" in delete_content:
self.test_results["delete_soft"] = {
"status": "PASS",
"result": delete_content[:200] + "..."
if len(delete_content) > 200
else delete_content,
"message": "delete (soft mode) successful",
}
print("✅ delete (soft mode) test passed")
else:
# Check if it's an expected error (like document not found)
if "not found" in delete_content.lower():
self.test_results["delete_soft"] = {
"status": "PASS",
"result": delete_content,
"message": "delete test passed with expected 'not found' error",
}
print("✅ delete test passed (expected 'not found' error)")
else:
raise Exception(
f"Delete returned unexpected content: {delete_content}"
)
else:
raise Exception("Delete returned no content")
else:
# Test with invalid UUIDs to check error handling
invalid_result = await session.call_tool(
"delete",
arguments={
"data_id": "invalid-uuid",
"dataset_id": "another-invalid-uuid",
"mode": "soft",
},
)
if invalid_result.content and len(invalid_result.content) > 0:
invalid_content = invalid_result.content[0].text
if "Invalid UUID format" in invalid_content:
self.test_results["delete_error_handling"] = {
"status": "PASS",
"result": invalid_content,
"message": "delete error handling works correctly",
}
print("✅ delete error handling test passed")
else:
raise Exception(f"Expected UUID error not found: {invalid_content}")
else:
raise Exception("Delete error test returned no content")
except Exception as e:
self.test_results["delete"] = {
"status": "FAIL",
"error": str(e),
"message": "delete test failed",
}
print(f"❌ delete test failed: {e}")
def test_utility_functions(self):
"""Test utility functions."""
print("\n🧪 Testing utility functions...")
@ -466,6 +684,10 @@ class TestModel:
await self.test_codify()
await self.test_cognee_add_developer_rules()
# Test list_data and delete functionality
await self.test_list_data()
await self.test_delete()
await self.test_search_functionality()
# Test utility functions (synchronous)
@ -506,7 +728,8 @@ class TestModel:
print(f"Failed: {failed}")
print(f"Success Rate: {(passed / total_tests * 100):.1f}%")
assert failed == 0, "\n ⚠️ Number of tests didn't pass!"
if failed > 0:
print(f"\n ⚠️ {failed} test(s) failed - review results above for details")
async def main():

6049
cognee-mcp/uv.lock generated

File diff suppressed because it is too large Load diff

View file

@ -353,7 +353,7 @@ def get_datasets_router() -> APIRouter:
@router.get("/status", response_model=dict[str, PipelineRunStatus])
async def get_dataset_status(
datasets: Annotated[List[UUID], Query(alias="dataset")] = None,
datasets: Annotated[List[UUID], Query(alias="dataset")] = [],
user: User = Depends(get_authenticated_user),
):
"""

View file

@ -86,12 +86,11 @@ class KuzuAdapter(GraphDBInterface):
if (
kuzu_db_version == "0.9.0" or kuzu_db_version == "0.8.2"
) and kuzu_db_version != kuzu.__version__:
# TODO: Write migration script that will handle all user graph databases in multi-user mode
# Try to migrate kuzu database to latest version
from .kuzu_migrate import kuzu_migration
kuzu_migration(
new_db=self.db_path + "new",
new_db=self.db_path + "_new",
old_db=self.db_path,
new_version=kuzu.__version__,
old_version=kuzu_db_version,
@ -1464,11 +1463,8 @@ class KuzuAdapter(GraphDBInterface):
It raises exceptions for failures occurring during deletion processes.
"""
try:
# Use DETACH DELETE to remove both nodes and their relationships in one operation
await self.query("MATCH (n:Node) DETACH DELETE n")
logger.info("Cleared all data from graph while preserving structure")
if self.connection:
self.connection.close()
self.connection = None
if self.db:
self.db.close()

View file

@ -94,6 +94,7 @@ def ensure_env(version: str, export_dir) -> str:
print(f"→ Setting up venv for Kùzu {version}...", file=sys.stderr)
# Create venv
# NOTE: Running python in debug mode can cause issues with creating a virtual environment from that python instance
subprocess.run([sys.executable, "-m", "venv", base], check=True)
# Install the specific Kùzu version
subprocess.run([py_bin, "-m", "pip", "install", "--upgrade", "pip"], check=True)
@ -169,6 +170,10 @@ def kuzu_migration(new_db, old_db, new_version, old_version=None, overwrite=None
# Rename new kuzu database to old kuzu database name if enabled
if overwrite or delete_old:
# Remove kuzu lock from migrated DB
lock_file = new_db + ".lock"
if os.path.exists(lock_file):
os.remove(lock_file)
rename_databases(old_db, old_version, new_db, delete_old)
print("✅ Kuzu graph database migration finished successfully!")
@ -189,7 +194,7 @@ def rename_databases(old_db: str, old_version: str, new_db: str, delete_old: boo
if os.path.isfile(old_db):
# File-based database: handle main file and accompanying lock/WAL
for ext in ["", ".lock", ".wal"]:
for ext in ["", ".wal"]:
src = old_db + ext
dst = backup_base + ext
if os.path.exists(src):
@ -211,7 +216,7 @@ def rename_databases(old_db: str, old_version: str, new_db: str, delete_old: boo
sys.exit(1)
# Now move new files into place
for ext in ["", ".lock", ".wal"]:
for ext in ["", ".wal"]:
src_new = new_db + ext
dst_new = os.path.join(base_dir, name + ext)
if os.path.exists(src_new):
@ -227,7 +232,7 @@ Examples:
%(prog)s --old-version 0.9.0 --new-version 0.11.0 \\
--old-db /path/to/old/db --new-db /path/to/new/db --overwrite
Note: This script will create virtual environments in .kuzu_envs/ directory
Note: This script will create temporary virtual environments in .kuzu_envs/ directory
to isolate different Kuzu versions.
""",
formatter_class=argparse.RawDescriptionHelpFormatter,

View file

@ -18,12 +18,12 @@ class Data(Base):
mime_type = Column(String)
raw_data_location = Column(String)
owner_id = Column(UUID, index=True)
tenant_id = Column(UUID, index=True, default=None)
tenant_id = Column(UUID, index=True, nullable=True)
content_hash = Column(String)
external_metadata = Column(JSON)
node_set = Column(JSON, nullable=True) # Store NodeSet as JSON list of strings
token_count = Column(Integer)
data_size = Column(Integer) # File size in bytes
data_size = Column(Integer, nullable=True) # File size in bytes
created_at = Column(DateTime(timezone=True), default=lambda: datetime.now(timezone.utc))
updated_at = Column(DateTime(timezone=True), onupdate=lambda: datetime.now(timezone.utc))

View file

@ -1,3 +1,4 @@
import time
from cognee.shared.logging_utils import get_logger
from typing import List, Dict, Union, Optional, Type
@ -154,11 +155,16 @@ class CogneeGraph(CogneeAbstractGraph):
raise ValueError("Failed to generate query embedding.")
if edge_distances is None:
start_time = time.time()
edge_distances = await vector_engine.search(
collection_name="EdgeType_relationship_name",
query_vector=query_vector,
limit=0,
)
projection_time = time.time() - start_time
logger.info(
f"Edge collection distances were calculated separately from nodes in {projection_time:.2f}s"
)
embedding_map = {result.payload["text"]: result.score for result in edge_distances}

View file

@ -1,4 +1,5 @@
import asyncio
import time
from typing import List, Optional, Type
from cognee.shared.logging_utils import get_logger, ERROR
@ -174,6 +175,8 @@ async def brute_force_search(
return []
try:
start_time = time.time()
results = await asyncio.gather(
*[search_in_collection(collection_name) for collection_name in collections]
)
@ -181,6 +184,12 @@ async def brute_force_search(
if all(not item for item in results):
return []
# Final statistics
projection_time = time.time() - start_time
logger.info(
f"Vector collection retrieval completed: Retrieved distances from {sum(1 for res in results if res)} collections in {projection_time:.2f}s"
)
node_distances = {collection: result for collection, result in zip(collections, results)}
edge_distances = node_distances.get("EdgeType_relationship_name", None)

View file

@ -28,18 +28,38 @@ class TestGraphCompletionRetriever:
class Company(DataPoint):
name: str
description: str
class Person(DataPoint):
name: str
description: str
works_for: Company
company1 = Company(name="Figma")
company2 = Company(name="Canva")
person1 = Person(name="Steve Rodger", works_for=company1)
person2 = Person(name="Ike Loma", works_for=company1)
person3 = Person(name="Jason Statham", works_for=company1)
person4 = Person(name="Mike Broski", works_for=company2)
person5 = Person(name="Christina Mayer", works_for=company2)
company1 = Company(name="Figma", description="Figma is a company")
company2 = Company(name="Canva", description="Canvas is a company")
person1 = Person(
name="Steve Rodger",
description="This is description about Steve Rodger",
works_for=company1,
)
person2 = Person(
name="Ike Loma", description="This is description about Ike Loma", works_for=company1
)
person3 = Person(
name="Jason Statham",
description="This is description about Jason Statham",
works_for=company1,
)
person4 = Person(
name="Mike Broski",
description="This is description about Mike Broski",
works_for=company2,
)
person5 = Person(
name="Christina Mayer",
description="This is description about Christina Mayer",
works_for=company2,
)
entities = [company1, company2, person1, person2, person3, person4, person5]
@ -49,8 +69,63 @@ class TestGraphCompletionRetriever:
context = await retriever.get_context("Who works at Canva?")
assert "Mike Broski --[works_for]--> Canva" in context, "Failed to get Mike Broski"
assert "Christina Mayer --[works_for]--> Canva" in context, "Failed to get Christina Mayer"
# Ensure the top-level sections are present
assert "Nodes:" in context, "Missing 'Nodes:' section in context"
assert "Connections:" in context, "Missing 'Connections:' section in context"
# --- Nodes headers ---
assert "Node: Steve Rodger" in context, "Missing node header for Steve Rodger"
assert "Node: Figma" in context, "Missing node header for Figma"
assert "Node: Ike Loma" in context, "Missing node header for Ike Loma"
assert "Node: Jason Statham" in context, "Missing node header for Jason Statham"
assert "Node: Mike Broski" in context, "Missing node header for Mike Broski"
assert "Node: Canva" in context, "Missing node header for Canva"
assert "Node: Christina Mayer" in context, "Missing node header for Christina Mayer"
# --- Node contents ---
assert (
"__node_content_start__\nThis is description about Steve Rodger\n__node_content_end__"
in context
), "Description block for Steve Rodger altered"
assert "__node_content_start__\nFigma is a company\n__node_content_end__" in context, (
"Description block for Figma altered"
)
assert (
"__node_content_start__\nThis is description about Ike Loma\n__node_content_end__"
in context
), "Description block for Ike Loma altered"
assert (
"__node_content_start__\nThis is description about Jason Statham\n__node_content_end__"
in context
), "Description block for Jason Statham altered"
assert (
"__node_content_start__\nThis is description about Mike Broski\n__node_content_end__"
in context
), "Description block for Mike Broski altered"
assert "__node_content_start__\nCanvas is a company\n__node_content_end__" in context, (
"Description block for Canva altered"
)
assert (
"__node_content_start__\nThis is description about Christina Mayer\n__node_content_end__"
in context
), "Description block for Christina Mayer altered"
# --- Connections ---
assert "Steve Rodger --[works_for]--> Figma" in context, (
"Connection Steve Rodger→Figma missing or changed"
)
assert "Ike Loma --[works_for]--> Figma" in context, (
"Connection Ike Loma→Figma missing or changed"
)
assert "Jason Statham --[works_for]--> Figma" in context, (
"Connection Jason Statham→Figma missing or changed"
)
assert "Mike Broski --[works_for]--> Canva" in context, (
"Connection Mike Broski→Canva missing or changed"
)
assert "Christina Mayer --[works_for]--> Canva" in context, (
"Connection Christina Mayer→Canva missing or changed"
)
@pytest.mark.asyncio
async def test_graph_completion_context_complex(self):

View file

@ -1,3 +1,34 @@
# Cognee Docker Compose Configuration
#
# This docker-compose file includes the main Cognee API server and optional services:
#
# BASIC USAGE:
# Start main Cognee API server:
# docker-compose up cognee
#
# MCP SERVER USAGE:
# The MCP (Model Context Protocol) server enables IDE integration with tools like Cursor, Claude Desktop, etc.
#
# Start with MCP server (stdio transport - recommended):
# docker-compose --profile mcp up
#
# Start with MCP server (SSE transport for HTTP access):
# TRANSPORT_MODE=sse docker-compose --profile mcp up
#
# PORT CONFIGURATION:
# - Main Cognee API: http://localhost:8000
# - MCP Server (SSE mode): http://localhost:8001
# - Frontend (UI): http://localhost:3000 (with --profile ui)
#
# DEBUGGING:
# Enable debug mode by setting DEBUG=true in your .env file or:
# DEBUG=true docker-compose --profile mcp up
#
# This exposes debugger ports:
# - Main API debugger: localhost:5678
# - MCP Server debugger: localhost:5679
services:
cognee:
container_name: cognee
@ -26,6 +57,49 @@ services:
cpus: "4.0"
memory: 8GB
# Cognee MCP Server - Model Context Protocol server for IDE integration
cognee-mcp:
container_name: cognee-mcp
profiles:
- mcp
networks:
- cognee-network
build:
context: .
dockerfile: cognee-mcp/Dockerfile
volumes:
- .env:/app/.env
# Optional: Mount local data for ingestion
- ./examples/data:/app/data:ro
environment:
- DEBUG=false # Change to true if debugging
- ENVIRONMENT=local
- LOG_LEVEL=INFO
- TRANSPORT_MODE=stdio # Use 'sse' for Server-Sent Events over HTTP
# Database configuration - should match the main cognee service
- DB_TYPE=${DB_TYPE:-sqlite}
- DB_HOST=${DB_HOST:-host.docker.internal}
- DB_PORT=${DB_PORT:-5432}
- DB_NAME=${DB_NAME:-cognee_db}
- DB_USERNAME=${DB_USERNAME:-cognee}
- DB_PASSWORD=${DB_PASSWORD:-cognee}
# MCP specific configuration
- MCP_LOG_LEVEL=INFO
- PYTHONUNBUFFERED=1
extra_hosts:
- "host.docker.internal:host-gateway"
ports:
# Only expose ports when using SSE transport
- "8001:8000" # MCP SSE port (mapped to avoid conflict with main API)
- "5679:5678" # MCP debugger port (different from main service)
depends_on:
- cognee
deploy:
resources:
limits:
cpus: "2.0"
memory: 4GB
# NOTE: Frontend is a work in progress and supports minimum amount of features required to be functional.
# If you want to use Cognee with a UI environment you can integrate the Cognee MCP Server into Cursor / Claude Desktop / Visual Studio Code (through Cline/Roo)
frontend:

View file

@ -4,6 +4,12 @@ set -e # Exit on error
echo "Debug mode: $DEBUG"
echo "Environment: $ENVIRONMENT"
# Set default ports if not specified
DEBUG_PORT=${DEBUG_PORT:-5678}
HTTP_PORT=${HTTP_PORT:-8000}
echo "Debug port: $DEBUG_PORT"
echo "HTTP port: $HTTP_PORT"
# Run Alembic migrations with proper error handling.
# Note on UserAlreadyExists error handling:
# During database migrations, we attempt to create a default user. If this user
@ -37,10 +43,10 @@ sleep 2
if [ "$ENVIRONMENT" = "dev" ] || [ "$ENVIRONMENT" = "local" ]; then
if [ "$DEBUG" = "true" ]; then
echo "Waiting for the debugger to attach..."
debugpy --wait-for-client --listen 0.0.0.0:5678 -m gunicorn -w 3 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:8000 --log-level debug --reload cognee.api.client:app
debugpy --wait-for-client --listen 0.0.0.0:$DEBUG_PORT -m gunicorn -w 1 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:$HTTP_PORT --log-level debug --reload cognee.api.client:app
else
gunicorn -w 3 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:8000 --log-level debug --reload cognee.api.client:app
gunicorn -w 1 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:$HTTP_PORT --log-level debug --reload cognee.api.client:app
fi
else
gunicorn -w 3 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:8000 --log-level error cognee.api.client:app
gunicorn -w 1 -k uvicorn.workers.UvicornWorker -t 30000 --bind=0.0.0.0:$HTTP_PORT --log-level error cognee.api.client:app
fi

863
poetry.lock generated

File diff suppressed because it is too large Load diff

View file

@ -1,6 +1,7 @@
[project]
name = "cognee"
version = "0.2.1-dev"
version = "0.2.2.dev0"
description = "Cognee - is a library for enriching LLM context with a semantic layer for better understanding and reasoning."
authors = [
{ name = "Vasilije Markovic" },

725
uv.lock generated

File diff suppressed because it is too large Load diff