From eb8996dd8171beea712513679a356d84ee0567ec Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:08:30 +0100 Subject: [PATCH 01/41] feat: extends CacheDBInterface base class with the logging related methods --- .../databases/cache/cache_db_interface.py | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/cognee/infrastructure/databases/cache/cache_db_interface.py b/cognee/infrastructure/databases/cache/cache_db_interface.py index 801e86188..97782aa7c 100644 --- a/cognee/infrastructure/databases/cache/cache_db_interface.py +++ b/cognee/infrastructure/databases/cache/cache_db_interface.py @@ -77,3 +77,37 @@ class CacheDBInterface(ABC): Gracefully close any async connections. """ pass + + @abstractmethod + async def log_usage( + self, + user_id: str, + log_entry: dict, + ttl: int | None = 604800, + ): + """ + Log usage information (API endpoint calls, MCP tool invocations) to cache. + + Args: + user_id: The user ID. + log_entry: Dictionary containing usage log information. + ttl: Optional time-to-live (seconds). If provided, the log list expires after this time. + + Raises: + CacheConnectionError: If cache connection fails or times out. + """ + pass + + @abstractmethod + async def get_usage_logs(self, user_id: str, limit: int = 100): + """ + Retrieve usage logs for a given user. + + Args: + user_id: The user ID. + limit: Maximum number of logs to retrieve (default: 100). + + Returns: + List of usage log entries, most recent first. + """ + pass From 90cf79b4206f757efde1970117e9dce282491516 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:10:13 +0100 Subject: [PATCH 02/41] feat: adds new config params to Cacheconfig --- cognee/infrastructure/databases/cache/config.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/cognee/infrastructure/databases/cache/config.py b/cognee/infrastructure/databases/cache/config.py index 88ac05885..de316d3fe 100644 --- a/cognee/infrastructure/databases/cache/config.py +++ b/cognee/infrastructure/databases/cache/config.py @@ -13,6 +13,8 @@ class CacheConfig(BaseSettings): - cache_port: Port number for the cache service. - agentic_lock_expire: Automatic lock expiration time (in seconds). - agentic_lock_timeout: Maximum time (in seconds) to wait for the lock release. + - usage_logging: Enable/disable usage logging for API endpoints and MCP tools. + - usage_logging_ttl: Time-to-live for usage logs in seconds (default: 7 days). """ cache_backend: Literal["redis", "fs"] = "fs" @@ -24,6 +26,8 @@ class CacheConfig(BaseSettings): cache_password: Optional[str] = None agentic_lock_expire: int = 240 agentic_lock_timeout: int = 300 + usage_logging: bool = False + usage_logging_ttl: int = 604800 model_config = SettingsConfigDict(env_file=".env", extra="allow") @@ -38,6 +42,8 @@ class CacheConfig(BaseSettings): "cache_password": self.cache_password, "agentic_lock_expire": self.agentic_lock_expire, "agentic_lock_timeout": self.agentic_lock_timeout, + "usage_logging": self.usage_logging, + "usage_logging_ttl": self.usage_logging_ttl, } From 8f0705359a7ae3b942f1167c84a154f9b92b36a4 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:13:03 +0100 Subject: [PATCH 03/41] feat: adds log usage and get logs operations to RedisAdapter --- .../databases/cache/redis/RedisAdapter.py | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/cognee/infrastructure/databases/cache/redis/RedisAdapter.py b/cognee/infrastructure/databases/cache/redis/RedisAdapter.py index b0c51d68e..d3809f4dc 100644 --- a/cognee/infrastructure/databases/cache/redis/RedisAdapter.py +++ b/cognee/infrastructure/databases/cache/redis/RedisAdapter.py @@ -177,6 +177,64 @@ class RedisAdapter(CacheDBInterface): entries = await self.async_redis.lrange(session_key, 0, -1) return [json.loads(e) for e in entries] + async def log_usage( + self, + user_id: str, + log_entry: dict, + ttl: int | None = 604800, + ): + """ + Log usage information (API endpoint calls, MCP tool invocations) to Redis. + + Args: + user_id: The user ID. + log_entry: Dictionary containing usage log information. + ttl: Optional time-to-live (seconds). If provided, the log list expires after this time. + + Raises: + CacheConnectionError: If Redis connection fails or times out. + """ + try: + usage_logs_key = f"usage_logs:{user_id}" + + await self.async_redis.rpush(usage_logs_key, json.dumps(log_entry)) + + if ttl is not None: + await self.async_redis.expire(usage_logs_key, ttl) + + except (redis.ConnectionError, redis.TimeoutError) as e: + error_msg = f"Redis connection error while logging usage: {str(e)}" + logger.error(error_msg) + raise CacheConnectionError(error_msg) from e + except Exception as e: + error_msg = f"Unexpected error while logging usage to Redis: {str(e)}" + logger.error(error_msg) + raise CacheConnectionError(error_msg) from e + + async def get_usage_logs(self, user_id: str, limit: int = 100): + """ + Retrieve usage logs for a given user. + + Args: + user_id: The user ID. + limit: Maximum number of logs to retrieve (default: 100). + + Returns: + List of usage log entries, most recent first. + """ + try: + usage_logs_key = f"usage_logs:{user_id}" + entries = await self.async_redis.lrange(usage_logs_key, -limit, -1) + return [json.loads(e) for e in reversed(entries)] if entries else [] + except (redis.ConnectionError, redis.TimeoutError) as e: + error_msg = f"Redis connection error while retrieving usage logs: {str(e)}" + logger.error(error_msg) + raise CacheConnectionError(error_msg) from e + except Exception as e: + error_msg = f"Unexpected error while retrieving usage logs from Redis: {str(e)}" + logger.error(error_msg) + raise CacheConnectionError(error_msg) from e + async def close(self): """ Gracefully close the async Redis connection. From f4c2365c23dde7e85947c85fc1e9413a5556973b Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:48:10 +0100 Subject: [PATCH 04/41] Adds default methods to satisfy base class (FSCache does not support the logging for now) --- .../databases/cache/fscache/FsCacheAdapter.py | 21 +++++++++++++++++++ 1 file changed, 21 insertions(+) diff --git a/cognee/infrastructure/databases/cache/fscache/FsCacheAdapter.py b/cognee/infrastructure/databases/cache/fscache/FsCacheAdapter.py index 497e6afec..6cedcdc8f 100644 --- a/cognee/infrastructure/databases/cache/fscache/FsCacheAdapter.py +++ b/cognee/infrastructure/databases/cache/fscache/FsCacheAdapter.py @@ -89,6 +89,27 @@ class FSCacheAdapter(CacheDBInterface): return None return json.loads(value) + async def log_usage( + self, + user_id: str, + log_entry: dict, + ttl: int | None = 604800, + ): + """ + Usage logging is not supported in filesystem cache backend. + This method is a no-op to satisfy the interface. + """ + logger.warning("Usage logging not supported in FSCacheAdapter, skipping") + pass + + async def get_usage_logs(self, user_id: str, limit: int = 100): + """ + Usage logging is not supported in filesystem cache backend. + This method returns an empty list to satisfy the interface. + """ + logger.warning("Usage logging not supported in FSCacheAdapter, returning empty list") + return [] + async def close(self): if self.cache is not None: self.cache.expire() From 97fcc15af5fd36a9248ac976f8da9b51f523bdb5 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:49:37 +0100 Subject: [PATCH 05/41] feat: updates constructor params in base class --- cognee/infrastructure/databases/cache/cache_db_interface.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cognee/infrastructure/databases/cache/cache_db_interface.py b/cognee/infrastructure/databases/cache/cache_db_interface.py index 97782aa7c..1e7949554 100644 --- a/cognee/infrastructure/databases/cache/cache_db_interface.py +++ b/cognee/infrastructure/databases/cache/cache_db_interface.py @@ -8,10 +8,11 @@ class CacheDBInterface(ABC): Provides a common interface for lock acquisition, release, and context-managed locking. """ - def __init__(self, host: str, port: int, lock_key: str): + def __init__(self, host: str, port: int, lock_key: str = "default_lock", log_key: str = "usage_logs"): self.host = host self.port = port self.lock_key = lock_key + self.log_key = log_key self.lock = None @abstractmethod From e8edf4482d1de5d302ed16b2dcaeb9717617ab7a Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:53:36 +0100 Subject: [PATCH 06/41] feat: adds usage logging and log key to the cache engine factory --- .../infrastructure/databases/cache/get_cache_engine.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/cognee/infrastructure/databases/cache/get_cache_engine.py b/cognee/infrastructure/databases/cache/get_cache_engine.py index f70358607..62dff7465 100644 --- a/cognee/infrastructure/databases/cache/get_cache_engine.py +++ b/cognee/infrastructure/databases/cache/get_cache_engine.py @@ -1,7 +1,6 @@ """Factory to get the appropriate cache coordination engine (e.g., Redis).""" from functools import lru_cache -import os from typing import Optional from cognee.infrastructure.databases.cache.config import get_cache_config from cognee.infrastructure.databases.cache.cache_db_interface import CacheDBInterface @@ -17,6 +16,7 @@ def create_cache_engine( cache_username: str, cache_password: str, lock_key: str, + log_key: str, agentic_lock_expire: int = 240, agentic_lock_timeout: int = 300, ): @@ -30,6 +30,7 @@ def create_cache_engine( - cache_username: Username to authenticate with. - cache_password: Password to authenticate with. - lock_key: Identifier used for the locking resource. + - log_key: Identifier used for usage logging. - agentic_lock_expire: Duration to hold the lock after acquisition. - agentic_lock_timeout: Max time to wait for the lock before failing. @@ -37,7 +38,7 @@ def create_cache_engine( -------- - CacheDBInterface: An instance of the appropriate cache adapter. """ - if config.caching: + if config.caching or config.usage_logging: from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter if config.cache_backend == "redis": @@ -47,6 +48,7 @@ def create_cache_engine( username=cache_username, password=cache_password, lock_name=lock_key, + log_key=log_key, timeout=agentic_lock_expire, blocking_timeout=agentic_lock_timeout, ) @@ -61,7 +63,7 @@ def create_cache_engine( return None -def get_cache_engine(lock_key: Optional[str] = None) -> CacheDBInterface: +def get_cache_engine(lock_key: Optional[str] = None, log_key: Optional[str] = None) -> CacheDBInterface: """ Returns a cache adapter instance using current context configuration. """ @@ -72,6 +74,7 @@ def get_cache_engine(lock_key: Optional[str] = None) -> CacheDBInterface: cache_username=config.cache_username, cache_password=config.cache_password, lock_key=lock_key, + log_key=log_key, agentic_lock_expire=config.agentic_lock_expire, agentic_lock_timeout=config.agentic_lock_timeout, ) From 8b49f892ce670be22de2ae1372a3135ac5fe6ac8 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:53:54 +0100 Subject: [PATCH 07/41] ruff fix --- cognee/infrastructure/databases/cache/cache_db_interface.py | 4 +++- cognee/infrastructure/databases/cache/get_cache_engine.py | 4 +++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/cognee/infrastructure/databases/cache/cache_db_interface.py b/cognee/infrastructure/databases/cache/cache_db_interface.py index 1e7949554..c93cf652e 100644 --- a/cognee/infrastructure/databases/cache/cache_db_interface.py +++ b/cognee/infrastructure/databases/cache/cache_db_interface.py @@ -8,7 +8,9 @@ class CacheDBInterface(ABC): Provides a common interface for lock acquisition, release, and context-managed locking. """ - def __init__(self, host: str, port: int, lock_key: str = "default_lock", log_key: str = "usage_logs"): + def __init__( + self, host: str, port: int, lock_key: str = "default_lock", log_key: str = "usage_logs" + ): self.host = host self.port = port self.lock_key = lock_key diff --git a/cognee/infrastructure/databases/cache/get_cache_engine.py b/cognee/infrastructure/databases/cache/get_cache_engine.py index 62dff7465..85a59bfde 100644 --- a/cognee/infrastructure/databases/cache/get_cache_engine.py +++ b/cognee/infrastructure/databases/cache/get_cache_engine.py @@ -63,7 +63,9 @@ def create_cache_engine( return None -def get_cache_engine(lock_key: Optional[str] = None, log_key: Optional[str] = None) -> CacheDBInterface: +def get_cache_engine( + lock_key: Optional[str] = None, log_key: Optional[str] = None +) -> CacheDBInterface: """ Returns a cache adapter instance using current context configuration. """ From ace34b9a91e8d4c9c4e4ba724fab71fd88756319 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 11:56:01 +0100 Subject: [PATCH 08/41] feat: adds log key to RedisAdapter --- .../infrastructure/databases/cache/redis/RedisAdapter.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/cognee/infrastructure/databases/cache/redis/RedisAdapter.py b/cognee/infrastructure/databases/cache/redis/RedisAdapter.py index d3809f4dc..98021e968 100644 --- a/cognee/infrastructure/databases/cache/redis/RedisAdapter.py +++ b/cognee/infrastructure/databases/cache/redis/RedisAdapter.py @@ -17,13 +17,14 @@ class RedisAdapter(CacheDBInterface): host, port, lock_name="default_lock", + log_key="usage_logs", username=None, password=None, timeout=240, blocking_timeout=300, connection_timeout=30, ): - super().__init__(host, port, lock_name) + super().__init__(host, port, lock_name, log_key) self.host = host self.port = port @@ -195,7 +196,7 @@ class RedisAdapter(CacheDBInterface): CacheConnectionError: If Redis connection fails or times out. """ try: - usage_logs_key = f"usage_logs:{user_id}" + usage_logs_key = f"{self.log_key}:{user_id}" await self.async_redis.rpush(usage_logs_key, json.dumps(log_entry)) @@ -223,7 +224,7 @@ class RedisAdapter(CacheDBInterface): List of usage log entries, most recent first. """ try: - usage_logs_key = f"usage_logs:{user_id}" + usage_logs_key = f"{self.log_key}:{user_id}" entries = await self.async_redis.lrange(usage_logs_key, -limit, -1) return [json.loads(e) for e in reversed(entries)] if entries else [] except (redis.ConnectionError, redis.TimeoutError) as e: From 34513f2c108ca022d19230a0a262edc6069cb3c8 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 12:46:19 +0100 Subject: [PATCH 09/41] fix: fixes unit test for cacheconfig params --- .../unit/infrastructure/databases/cache/test_cache_config.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cognee/tests/unit/infrastructure/databases/cache/test_cache_config.py b/cognee/tests/unit/infrastructure/databases/cache/test_cache_config.py index 837a9955c..ace43628a 100644 --- a/cognee/tests/unit/infrastructure/databases/cache/test_cache_config.py +++ b/cognee/tests/unit/infrastructure/databases/cache/test_cache_config.py @@ -62,6 +62,8 @@ def test_cache_config_to_dict(): "cache_password": None, "agentic_lock_expire": 100, "agentic_lock_timeout": 200, + "usage_logging": False, + "usage_logging_ttl": 604800, } From e803f10417816ddfe4bc52f43d34e286ac97e280 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 14:22:01 +0100 Subject: [PATCH 10/41] feat: implements first version of usage_logger decorator --- cognee/shared/usage_logger.py | 311 ++++++++++++++++++++++++++++++++++ 1 file changed, 311 insertions(+) create mode 100644 cognee/shared/usage_logger.py diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py new file mode 100644 index 000000000..782c8f762 --- /dev/null +++ b/cognee/shared/usage_logger.py @@ -0,0 +1,311 @@ +import asyncio +import inspect +import os +from datetime import datetime, timezone +from functools import wraps +from typing import Any, Callable, Optional +from uuid import UUID + +from cognee.infrastructure.databases.cache.config import get_cache_config +from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine +from cognee.shared.logging_utils import get_logger +from cognee import __version__ as cognee_version + +logger = get_logger("usage_logger") + + +def _sanitize_value(value: Any) -> Any: + """Ensure value is JSON serializable - converts non-serializable values to default messages.""" + if value is None: + return None + if isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, (UUID,)): + return str(value) + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, (list, tuple)): + return [_sanitize_value(v) for v in value] + if isinstance(value, dict): + sanitized = {} + for k, v in value.items(): + if isinstance(k, str): + key_str = k + else: + sanitized_key = _sanitize_value(k) + if isinstance(sanitized_key, str): + key_str = sanitized_key + else: + key_str = str(sanitized_key) if sanitized_key != "" else f"" + sanitized[key_str] = _sanitize_value(v) + return sanitized + try: + str_repr = str(value) + if str_repr.startswith("<") and str_repr.endswith(">"): + return f"" + return str_repr + except Exception: + return f"" + + +def _extract_user_id(args: tuple, kwargs: dict, func: Callable) -> Optional[str]: + """Extract user_id from function arguments if available.""" + try: + if "user" in kwargs and kwargs["user"] is not None: + user = kwargs["user"] + if hasattr(user, "id"): + return str(user.id) + + sig = inspect.signature(func) + param_names = list(sig.parameters.keys()) + for i, param_name in enumerate(param_names): + if i < len(args) and param_name == "user": + user = args[i] + if user is not None and hasattr(user, "id"): + return str(user.id) + return None + except Exception: + return None + + +def _extract_parameters(args: tuple, kwargs: dict, func: Callable) -> dict: + """Extract function parameters - captures all parameters, sanitizes for JSON.""" + params = {} + + for key, value in kwargs.items(): + if key != "user": + params[key] = _sanitize_value(value) + + try: + sig = inspect.signature(func) + param_names = list(sig.parameters.keys()) + for i, param_name in enumerate(param_names): + if i < len(args) and param_name != "user" and param_name not in kwargs: + params[param_name] = _sanitize_value(args[i]) + except Exception: + for i, arg_value in enumerate(args): + if i not in params.values(): + params[f"arg_{i}"] = _sanitize_value(arg_value) + + return params + + +async def _log_usage_async( + function_name: str, + log_type: str, + user_id: Optional[str], + parameters: dict, + result: Any, + success: bool, + error: Optional[str], + duration_ms: float, + start_time: datetime, + end_time: datetime, +): + """Asynchronously log function usage to Redis.""" + try: + logger.debug(f"Starting to log usage for {function_name} at {start_time.isoformat()}") + config = get_cache_config() + if not config.usage_logging: + logger.debug("Usage logging disabled, skipping log") + return + + logger.debug(f"Getting cache engine for {function_name}") + cache_engine = get_cache_engine(lock_key=None, log_key="usage_logging") + if cache_engine is None: + logger.warning( + f"Cache engine not available for usage logging (function: {function_name})" + ) + return + + logger.debug(f"Cache engine obtained for {function_name}") + + if user_id is None: + user_id = "unknown" + logger.debug(f"No user_id provided, using 'unknown' for {function_name}") + + log_entry = { + "timestamp": start_time.isoformat(), + "type": log_type, + "function_name": function_name, + "user_id": user_id, + "parameters": parameters, + "result": _sanitize_value(result), + "success": success, + "error": error, + "duration_ms": round(duration_ms, 2), + "start_time": start_time.isoformat(), + "end_time": end_time.isoformat(), + "metadata": { + "cognee_version": cognee_version, + "environment": os.getenv("ENV", "prod"), + }, + } + + logger.debug(f"Calling log_usage for {function_name}, user_id={user_id}") + await cache_engine.log_usage( + user_id=user_id, + log_entry=log_entry, + ttl=config.usage_logging_ttl, + ) + logger.info(f"Successfully logged usage for {function_name} (user_id={user_id})") + except Exception as e: + logger.error(f"Failed to log usage for {function_name}: {str(e)}", exc_info=True) + + +def log_usage(function_name: Optional[str] = None, log_type: str = "function"): + """ + Decorator to log function usage to Redis. + + This decorator is completely transparent - it doesn't change function behavior. + It logs function name, parameters, result, timing, and user (if available). + + Args: + function_name: Optional name for the function (defaults to func.__name__) + log_type: Type of log entry (e.g., "api_endpoint", "mcp_tool", "function") + + Usage: + @log_usage() + async def my_function(...): + # function code + + @log_usage(function_name="POST /v1/add", log_type="api_endpoint") + async def add(...): + # endpoint code + """ + + def decorator(func: Callable) -> Callable: + @wraps(func) + async def async_wrapper(*args, **kwargs): + config = get_cache_config() + if not config.usage_logging: + return await func(*args, **kwargs) + + # Capture start time + start_time = datetime.now(timezone.utc) + + user_id = _extract_user_id(args, kwargs, func) + parameters = _extract_parameters(args, kwargs, func) + + result = None + success = True + error = None + + try: + result = await func(*args, **kwargs) + return result + except Exception as e: + success = False + error = str(e) + raise + finally: + end_time = datetime.now(timezone.utc) + duration_ms = (end_time - start_time).total_seconds() * 1000 + + try: + await _log_usage_async( + function_name=function_name or func.__name__, + log_type=log_type, + user_id=user_id, + parameters=parameters, + result=result, + success=success, + error=error, + duration_ms=duration_ms, + start_time=start_time, + end_time=end_time, + ) + except Exception as e: + logger.error( + f"Failed to log usage for {function_name or func.__name__}: {str(e)}", + exc_info=True, + ) + + return async_wrapper + + return decorator + + +if __name__ == "__main__": + # Example 1: Simple function with decorator + @log_usage(function_name="example_function", log_type="example") + async def example_function(param1: str, param2: int, user=None): + """Example function to demonstrate usage logging.""" + await asyncio.sleep(0.1) # Simulate some work + return {(1, 2): "ok"} + + # Example 2: Function with user parameter + class MockUser: + def __init__(self, user_id: str): + self.id = user_id + + @log_usage(function_name="example_with_user", log_type="example") + async def example_with_user(data: str, user: MockUser, wrong_param=datetime.utcnow().isoformat()): + """Example function with user parameter.""" + await asyncio.sleep(0.05) + return float("nan") + + + @log_usage(function_name="returns_cycle", log_type="function") + async def returns_cycle(): + a = [] + a.append(a) + return a + + async def run_example(): + """Run example demonstrations.""" + print("Usage Logger Example") + print("=" * 50) + + # Example 1: Simple function + print("\n1. Running example function:") + result1 = await example_function("example_data", 42) + print(f" Result: {result1}") + await asyncio.sleep(0.2) # Wait for async logging to complete + + # Example 2: Function with user + print("\n2. Running example function with user:") + mock_user = MockUser("example-user-123") + result2 = await example_with_user("sample_data", user=mock_user, wrong_param=datetime.utcnow().isoformat()) + result3 = await example_with_user("sample_data", user=mock_user) + + print(f" Result: {result2}") + await asyncio.sleep(0.2) # Wait for async logging to complete + + await returns_cycle() + + # Example 3: Retrieve logs (if cache engine is available) + print("\n3. Retrieving usage logs:") + try: + config = get_cache_config() + if config.usage_logging: + cache_engine = get_cache_engine(lock_key="usage_logging") + if cache_engine: + # Get logs for the user + user_id = str(mock_user.id) + logs = await cache_engine.get_usage_logs(user_id, limit=10) + print(f" Found {len(logs)} log entries for user {user_id}") + if logs: + print( + f" Latest log: {logs[0]['function_name']} at {logs[0]['timestamp']}" + ) + else: + print(" Cache engine not available") + else: + print(" Usage logging is disabled (set USAGE_LOGGING=true)") + except Exception as e: + print(f" Error retrieving logs: {str(e)}") + + print("\n" + "=" * 50) + print("Example completed!") + print("\nNote: Make sure to set these environment variables:") + print(" - USAGE_LOGGING=true") + print(" - CACHING=true (or ensure cache backend is configured)") + print(" - CACHE_BACKEND=redis (or fs)") + print(" - CACHE_HOST=localhost") + print(" - CACHE_PORT=6379") + + + + # Run the example + asyncio.run(run_example()) From bf2357e7bf8019403c85e066c6c729dae4551fc2 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 15:16:26 +0100 Subject: [PATCH 11/41] chore: cleaning usage logger logic --- cognee/shared/usage_logger.py | 107 ++++++++++++++++++++++------------ 1 file changed, 70 insertions(+), 37 deletions(-) diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py index 782c8f762..4a9b1d860 100644 --- a/cognee/shared/usage_logger.py +++ b/cognee/shared/usage_logger.py @@ -2,7 +2,7 @@ import asyncio import inspect import os from datetime import datetime, timezone -from functools import wraps +from functools import singledispatch, wraps from typing import Any, Callable, Optional from uuid import UUID @@ -14,31 +14,9 @@ from cognee import __version__ as cognee_version logger = get_logger("usage_logger") +@singledispatch def _sanitize_value(value: Any) -> Any: - """Ensure value is JSON serializable - converts non-serializable values to default messages.""" - if value is None: - return None - if isinstance(value, (str, int, float, bool)): - return value - if isinstance(value, (UUID,)): - return str(value) - if isinstance(value, datetime): - return value.isoformat() - if isinstance(value, (list, tuple)): - return [_sanitize_value(v) for v in value] - if isinstance(value, dict): - sanitized = {} - for k, v in value.items(): - if isinstance(k, str): - key_str = k - else: - sanitized_key = _sanitize_value(k) - if isinstance(sanitized_key, str): - key_str = sanitized_key - else: - key_str = str(sanitized_key) if sanitized_key != "" else f"" - sanitized[key_str] = _sanitize_value(v) - return sanitized + """Default handler for JSON serialization - converts to string.""" try: str_repr = str(value) if str_repr.startswith("<") and str_repr.endswith(">"): @@ -48,7 +26,64 @@ def _sanitize_value(value: Any) -> Any: return f"" -def _extract_user_id(args: tuple, kwargs: dict, func: Callable) -> Optional[str]: +@_sanitize_value.register(type(None)) +def _(value: None) -> None: + return None + + +@_sanitize_value.register(str) +@_sanitize_value.register(int) +@_sanitize_value.register(float) +@_sanitize_value.register(bool) +def _(value: str | int | float | bool) -> str | int | float | bool: + return value + + +@_sanitize_value.register(UUID) +def _(value: UUID) -> str: + return str(value) + + +@_sanitize_value.register(datetime) +def _(value: datetime) -> str: + return value.isoformat() + + +@_sanitize_value.register(list) +@_sanitize_value.register(tuple) +def _(value: list | tuple) -> list: + return [_sanitize_value(v) for v in value] + + +@_sanitize_value.register(dict) +def _(value: dict) -> dict: + sanitized = {} + for k, v in value.items(): + key_str = k if isinstance(k, str) else _sanitize_dict_key(k) + sanitized[key_str] = _sanitize_value(v) + return sanitized + + +def _sanitize_dict_key(key: Any) -> str: + """Convert a non-string dict key to a string.""" + sanitized_key = _sanitize_value(key) + if isinstance(sanitized_key, str): + # If it's a "cannot be serialized" message, use a key-specific message + if sanitized_key.startswith("" + return sanitized_key + return str(sanitized_key) + + +def _get_param_names(func: Callable) -> list[str]: + """Get parameter names from function signature.""" + try: + return list(inspect.signature(func).parameters.keys()) + except Exception: + return [] + + +def _extract_user_id(args: tuple, kwargs: dict, param_names: list[str]) -> Optional[str]: """Extract user_id from function arguments if available.""" try: if "user" in kwargs and kwargs["user"] is not None: @@ -56,8 +91,6 @@ def _extract_user_id(args: tuple, kwargs: dict, func: Callable) -> Optional[str] if hasattr(user, "id"): return str(user.id) - sig = inspect.signature(func) - param_names = list(sig.parameters.keys()) for i, param_name in enumerate(param_names): if i < len(args) and param_name == "user": user = args[i] @@ -68,7 +101,7 @@ def _extract_user_id(args: tuple, kwargs: dict, func: Callable) -> Optional[str] return None -def _extract_parameters(args: tuple, kwargs: dict, func: Callable) -> dict: +def _extract_parameters(args: tuple, kwargs: dict, param_names: list[str]) -> dict: """Extract function parameters - captures all parameters, sanitizes for JSON.""" params = {} @@ -76,16 +109,14 @@ def _extract_parameters(args: tuple, kwargs: dict, func: Callable) -> dict: if key != "user": params[key] = _sanitize_value(value) - try: - sig = inspect.signature(func) - param_names = list(sig.parameters.keys()) + if param_names: for i, param_name in enumerate(param_names): if i < len(args) and param_name != "user" and param_name not in kwargs: params[param_name] = _sanitize_value(args[i]) - except Exception: + else: + # Fallback: capture all args by position if signature inspection fails for i, arg_value in enumerate(args): - if i not in params.values(): - params[f"arg_{i}"] = _sanitize_value(arg_value) + params[f"arg_{i}"] = _sanitize_value(arg_value) return params @@ -184,8 +215,10 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): # Capture start time start_time = datetime.now(timezone.utc) - user_id = _extract_user_id(args, kwargs, func) - parameters = _extract_parameters(args, kwargs, func) + # Get param names once to avoid duplicate signature inspection + param_names = _get_param_names(func) + user_id = _extract_user_id(args, kwargs, param_names) + parameters = _extract_parameters(args, kwargs, param_names) result = None success = True From 8dc358da3972f33c7cf81e7d6bd2afbdd8a50bf8 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 15:21:15 +0100 Subject: [PATCH 12/41] feat: adds default param logging --- cognee/shared/usage_logger.py | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py index 4a9b1d860..0fad39355 100644 --- a/cognee/shared/usage_logger.py +++ b/cognee/shared/usage_logger.py @@ -68,7 +68,6 @@ def _sanitize_dict_key(key: Any) -> str: """Convert a non-string dict key to a string.""" sanitized_key = _sanitize_value(key) if isinstance(sanitized_key, str): - # If it's a "cannot be serialized" message, use a key-specific message if sanitized_key.startswith("" return sanitized_key @@ -83,6 +82,19 @@ def _get_param_names(func: Callable) -> list[str]: return [] +def _get_param_defaults(func: Callable) -> dict[str, Any]: + """Get parameter defaults from function signature.""" + try: + sig = inspect.signature(func) + defaults = {} + for param_name, param in sig.parameters.items(): + if param.default != inspect.Parameter.empty: + defaults[param_name] = param.default + return defaults + except Exception: + return {} + + def _extract_user_id(args: tuple, kwargs: dict, param_names: list[str]) -> Optional[str]: """Extract user_id from function arguments if available.""" try: @@ -101,8 +113,8 @@ def _extract_user_id(args: tuple, kwargs: dict, param_names: list[str]) -> Optio return None -def _extract_parameters(args: tuple, kwargs: dict, param_names: list[str]) -> dict: - """Extract function parameters - captures all parameters, sanitizes for JSON.""" +def _extract_parameters(args: tuple, kwargs: dict, param_names: list[str], func: Callable) -> dict: + """Extract function parameters - captures all parameters including defaults, sanitizes for JSON.""" params = {} for key, value in kwargs.items(): @@ -114,10 +126,15 @@ def _extract_parameters(args: tuple, kwargs: dict, param_names: list[str]) -> di if i < len(args) and param_name != "user" and param_name not in kwargs: params[param_name] = _sanitize_value(args[i]) else: - # Fallback: capture all args by position if signature inspection fails for i, arg_value in enumerate(args): params[f"arg_{i}"] = _sanitize_value(arg_value) + if param_names: + defaults = _get_param_defaults(func) + for param_name in param_names: + if param_name != "user" and param_name not in params and param_name in defaults: + params[param_name] = _sanitize_value(defaults[param_name]) + return params @@ -212,13 +229,11 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): if not config.usage_logging: return await func(*args, **kwargs) - # Capture start time start_time = datetime.now(timezone.utc) - # Get param names once to avoid duplicate signature inspection param_names = _get_param_names(func) user_id = _extract_user_id(args, kwargs, param_names) - parameters = _extract_parameters(args, kwargs, param_names) + parameters = _extract_parameters(args, kwargs, param_names, func) result = None success = True From 7ebe8563c5b6f0e3edaaabe3a646dec03e9e176c Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 15:21:24 +0100 Subject: [PATCH 13/41] ruff --- cognee/shared/usage_logger.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py index 0fad39355..3456991e0 100644 --- a/cognee/shared/usage_logger.py +++ b/cognee/shared/usage_logger.py @@ -116,11 +116,11 @@ def _extract_user_id(args: tuple, kwargs: dict, param_names: list[str]) -> Optio def _extract_parameters(args: tuple, kwargs: dict, param_names: list[str], func: Callable) -> dict: """Extract function parameters - captures all parameters including defaults, sanitizes for JSON.""" params = {} - + for key, value in kwargs.items(): if key != "user": params[key] = _sanitize_value(value) - + if param_names: for i, param_name in enumerate(param_names): if i < len(args) and param_name != "user" and param_name not in kwargs: @@ -128,13 +128,13 @@ def _extract_parameters(args: tuple, kwargs: dict, param_names: list[str], func: else: for i, arg_value in enumerate(args): params[f"arg_{i}"] = _sanitize_value(arg_value) - + if param_names: defaults = _get_param_defaults(func) for param_name in param_names: if param_name != "user" and param_name not in params and param_name in defaults: params[param_name] = _sanitize_value(defaults[param_name]) - + return params @@ -288,12 +288,13 @@ if __name__ == "__main__": self.id = user_id @log_usage(function_name="example_with_user", log_type="example") - async def example_with_user(data: str, user: MockUser, wrong_param=datetime.utcnow().isoformat()): + async def example_with_user( + data: str, user: MockUser, wrong_param=datetime.utcnow().isoformat() + ): """Example function with user parameter.""" await asyncio.sleep(0.05) return float("nan") - @log_usage(function_name="returns_cycle", log_type="function") async def returns_cycle(): a = [] @@ -314,7 +315,9 @@ if __name__ == "__main__": # Example 2: Function with user print("\n2. Running example function with user:") mock_user = MockUser("example-user-123") - result2 = await example_with_user("sample_data", user=mock_user, wrong_param=datetime.utcnow().isoformat()) + result2 = await example_with_user( + "sample_data", user=mock_user, wrong_param=datetime.utcnow().isoformat() + ) result3 = await example_with_user("sample_data", user=mock_user) print(f" Result: {result2}") @@ -353,7 +356,5 @@ if __name__ == "__main__": print(" - CACHE_HOST=localhost") print(" - CACHE_PORT=6379") - - # Run the example asyncio.run(run_example()) From b83af5f63f4dbf0f765cd7a7bd4a295cc42201be Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 15:58:40 +0100 Subject: [PATCH 14/41] feat: adds new exception to shared usage logger --- cognee/shared/exceptions/__init__.py | 4 +--- cognee/shared/exceptions/exceptions.py | 12 +++++++++++- cognee/shared/usage_logger.py | 14 ++++++-------- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/cognee/shared/exceptions/__init__.py b/cognee/shared/exceptions/__init__.py index 5e2ae6875..776a803b4 100644 --- a/cognee/shared/exceptions/__init__.py +++ b/cognee/shared/exceptions/__init__.py @@ -4,6 +4,4 @@ Custom exceptions for the Cognee API. This module defines a set of exceptions for handling various shared utility errors """ -from .exceptions import ( - IngestionError, -) +from .exceptions import IngestionError, UsageLoggerError diff --git a/cognee/shared/exceptions/exceptions.py b/cognee/shared/exceptions/exceptions.py index 3740f677d..c00a39b9e 100644 --- a/cognee/shared/exceptions/exceptions.py +++ b/cognee/shared/exceptions/exceptions.py @@ -1,4 +1,4 @@ -from cognee.exceptions import CogneeValidationError +from cognee.exceptions import CogneeConfigurationError, CogneeValidationError from fastapi import status @@ -10,3 +10,13 @@ class IngestionError(CogneeValidationError): status_code=status.HTTP_422_UNPROCESSABLE_CONTENT, ): super().__init__(message, name, status_code) + + +class UsageLoggerError(CogneeConfigurationError): + def __init__( + self, + message: str = "Usage logging configuration is invalid.", + name: str = "UsageLoggerError", + status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, + ): + super().__init__(message, name, status_code) diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py index 3456991e0..bf7b50b64 100644 --- a/cognee/shared/usage_logger.py +++ b/cognee/shared/usage_logger.py @@ -8,6 +8,7 @@ from uuid import UUID from cognee.infrastructure.databases.cache.config import get_cache_config from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine +from cognee.shared.exceptions import UsageLoggerError from cognee.shared.logging_utils import get_logger from cognee import __version__ as cognee_version @@ -223,6 +224,11 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): """ def decorator(func: Callable) -> Callable: + if not inspect.iscoroutinefunction(func): + raise UsageLoggerError( + f"@log_usage requires an async function. Got {func.__name__} which is not async." + ) + @wraps(func) async def async_wrapper(*args, **kwargs): config = get_cache_config() @@ -295,12 +301,6 @@ if __name__ == "__main__": await asyncio.sleep(0.05) return float("nan") - @log_usage(function_name="returns_cycle", log_type="function") - async def returns_cycle(): - a = [] - a.append(a) - return a - async def run_example(): """Run example demonstrations.""" print("Usage Logger Example") @@ -323,8 +323,6 @@ if __name__ == "__main__": print(f" Result: {result2}") await asyncio.sleep(0.2) # Wait for async logging to complete - await returns_cycle() - # Example 3: Retrieve logs (if cache engine is available) print("\n3. Retrieving usage logs:") try: From 2d5e74ced0a4f82bde0efbda81c615f7d0fc1148 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:01:21 +0100 Subject: [PATCH 15/41] fix: fixes codebunny suggestion --- cognee/infrastructure/databases/cache/get_cache_engine.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/infrastructure/databases/cache/get_cache_engine.py b/cognee/infrastructure/databases/cache/get_cache_engine.py index 85a59bfde..b09ac8472 100644 --- a/cognee/infrastructure/databases/cache/get_cache_engine.py +++ b/cognee/infrastructure/databases/cache/get_cache_engine.py @@ -65,7 +65,7 @@ def create_cache_engine( def get_cache_engine( lock_key: Optional[str] = None, log_key: Optional[str] = None -) -> CacheDBInterface: +) -> Optional[CacheDBInterface]: """ Returns a cache adapter instance using current context configuration. """ From bc8c6e8baed651058dc55e5056ba3ef144d67a7c Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:02:22 +0100 Subject: [PATCH 16/41] Update usage_logger.py --- cognee/shared/usage_logger.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py index bf7b50b64..edd1bebac 100644 --- a/cognee/shared/usage_logger.py +++ b/cognee/shared/usage_logger.py @@ -318,7 +318,7 @@ if __name__ == "__main__": result2 = await example_with_user( "sample_data", user=mock_user, wrong_param=datetime.utcnow().isoformat() ) - result3 = await example_with_user("sample_data", user=mock_user) + await example_with_user("sample_data", user=mock_user) print(f" Result: {result2}") await asyncio.sleep(0.2) # Wait for async logging to complete From c0a7b14ff3a5a9f0947fe697202ac5f49e10bf66 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:04:09 +0100 Subject: [PATCH 17/41] feat: adds log usage decorator to main api endpoints --- cognee/api/v1/add/routers/get_add_router.py | 2 ++ cognee/api/v1/cognify/routers/get_cognify_router.py | 2 ++ cognee/api/v1/search/routers/get_search_router.py | 2 ++ 3 files changed, 6 insertions(+) diff --git a/cognee/api/v1/add/routers/get_add_router.py b/cognee/api/v1/add/routers/get_add_router.py index 39dc1a3e6..96c716eec 100644 --- a/cognee/api/v1/add/routers/get_add_router.py +++ b/cognee/api/v1/add/routers/get_add_router.py @@ -10,6 +10,7 @@ from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry from cognee.modules.pipelines.models import PipelineRunErrored from cognee.shared.logging_utils import get_logger +from cognee.shared.usage_logger import log_usage from cognee import __version__ as cognee_version logger = get_logger() @@ -19,6 +20,7 @@ def get_add_router() -> APIRouter: router = APIRouter() @router.post("", response_model=dict) + @log_usage(function_name="POST /v1/add", log_type="api_endpoint") async def add( data: List[UploadFile] = File(default=None), datasetName: Optional[str] = Form(default=None), diff --git a/cognee/api/v1/cognify/routers/get_cognify_router.py b/cognee/api/v1/cognify/routers/get_cognify_router.py index a499b3ca3..b2f4e5491 100644 --- a/cognee/api/v1/cognify/routers/get_cognify_router.py +++ b/cognee/api/v1/cognify/routers/get_cognify_router.py @@ -29,6 +29,7 @@ from cognee.modules.pipelines.queues.pipeline_run_info_queues import ( ) from cognee.shared.logging_utils import get_logger from cognee.shared.utils import send_telemetry +from cognee.shared.usage_logger import log_usage from cognee import __version__ as cognee_version logger = get_logger("api.cognify") @@ -52,6 +53,7 @@ def get_cognify_router() -> APIRouter: router = APIRouter() @router.post("", response_model=dict) + @log_usage(function_name="POST /v1/cognify", log_type="api_endpoint") async def cognify(payload: CognifyPayloadDTO, user: User = Depends(get_authenticated_user)): """ Transform datasets into structured knowledge graphs through cognitive processing. diff --git a/cognee/api/v1/search/routers/get_search_router.py b/cognee/api/v1/search/routers/get_search_router.py index 26327628e..c4284bb8b 100644 --- a/cognee/api/v1/search/routers/get_search_router.py +++ b/cognee/api/v1/search/routers/get_search_router.py @@ -13,6 +13,7 @@ from cognee.modules.users.models import User from cognee.modules.search.operations import get_history from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry +from cognee.shared.usage_logger import log_usage from cognee import __version__ as cognee_version from cognee.infrastructure.databases.exceptions import DatabaseNotCreatedError from cognee.exceptions import CogneeValidationError @@ -75,6 +76,7 @@ def get_search_router() -> APIRouter: return JSONResponse(status_code=500, content={"error": str(error)}) @router.post("", response_model=Union[List[SearchResult], List]) + @log_usage(function_name="POST /v1/search", log_type="api_endpoint") async def search(payload: SearchPayloadDTO, user: User = Depends(get_authenticated_user)): """ Search for nodes in the graph database. From 7773e811b0865fd7935867626691738874497086 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 16:31:38 +0100 Subject: [PATCH 18/41] chore: cleaning and adding correct defaults --- .../databases/cache/get_cache_engine.py | 3 +- cognee/shared/usage_logger.py | 90 ++----------------- 2 files changed, 8 insertions(+), 85 deletions(-) diff --git a/cognee/infrastructure/databases/cache/get_cache_engine.py b/cognee/infrastructure/databases/cache/get_cache_engine.py index b09ac8472..b7d015278 100644 --- a/cognee/infrastructure/databases/cache/get_cache_engine.py +++ b/cognee/infrastructure/databases/cache/get_cache_engine.py @@ -64,7 +64,8 @@ def create_cache_engine( def get_cache_engine( - lock_key: Optional[str] = None, log_key: Optional[str] = None + lock_key: Optional[str] = "default_lock", + log_key: Optional[str] = "usage_logs", ) -> Optional[CacheDBInterface]: """ Returns a cache adapter instance using current context configuration. diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py index edd1bebac..33d2dac10 100644 --- a/cognee/shared/usage_logger.py +++ b/cognee/shared/usage_logger.py @@ -160,7 +160,7 @@ async def _log_usage_async( return logger.debug(f"Getting cache engine for {function_name}") - cache_engine = get_cache_engine(lock_key=None, log_key="usage_logging") + cache_engine = get_cache_engine() if cache_engine is None: logger.warning( f"Cache engine not available for usage logging (function: {function_name})" @@ -211,14 +211,14 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): Args: function_name: Optional name for the function (defaults to func.__name__) - log_type: Type of log entry (e.g., "api_endpoint", "mcp_tool", "function") + log_type: Type of log entry (e.g., "api_endpoint", "mcp_tool") Usage: - @log_usage() - async def my_function(...): - # function code + @log_usage(function_name="MCP my_mcp_tool", log_type="mcp_tool") + async def my_mcp_tool(...): + # mcp code - @log_usage(function_name="POST /v1/add", log_type="api_endpoint") + @log_usage(function_name="POST API /v1/add", log_type="api_endpoint") async def add(...): # endpoint code """ @@ -278,81 +278,3 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): return async_wrapper return decorator - - -if __name__ == "__main__": - # Example 1: Simple function with decorator - @log_usage(function_name="example_function", log_type="example") - async def example_function(param1: str, param2: int, user=None): - """Example function to demonstrate usage logging.""" - await asyncio.sleep(0.1) # Simulate some work - return {(1, 2): "ok"} - - # Example 2: Function with user parameter - class MockUser: - def __init__(self, user_id: str): - self.id = user_id - - @log_usage(function_name="example_with_user", log_type="example") - async def example_with_user( - data: str, user: MockUser, wrong_param=datetime.utcnow().isoformat() - ): - """Example function with user parameter.""" - await asyncio.sleep(0.05) - return float("nan") - - async def run_example(): - """Run example demonstrations.""" - print("Usage Logger Example") - print("=" * 50) - - # Example 1: Simple function - print("\n1. Running example function:") - result1 = await example_function("example_data", 42) - print(f" Result: {result1}") - await asyncio.sleep(0.2) # Wait for async logging to complete - - # Example 2: Function with user - print("\n2. Running example function with user:") - mock_user = MockUser("example-user-123") - result2 = await example_with_user( - "sample_data", user=mock_user, wrong_param=datetime.utcnow().isoformat() - ) - await example_with_user("sample_data", user=mock_user) - - print(f" Result: {result2}") - await asyncio.sleep(0.2) # Wait for async logging to complete - - # Example 3: Retrieve logs (if cache engine is available) - print("\n3. Retrieving usage logs:") - try: - config = get_cache_config() - if config.usage_logging: - cache_engine = get_cache_engine(lock_key="usage_logging") - if cache_engine: - # Get logs for the user - user_id = str(mock_user.id) - logs = await cache_engine.get_usage_logs(user_id, limit=10) - print(f" Found {len(logs)} log entries for user {user_id}") - if logs: - print( - f" Latest log: {logs[0]['function_name']} at {logs[0]['timestamp']}" - ) - else: - print(" Cache engine not available") - else: - print(" Usage logging is disabled (set USAGE_LOGGING=true)") - except Exception as e: - print(f" Error retrieving logs: {str(e)}") - - print("\n" + "=" * 50) - print("Example completed!") - print("\nNote: Make sure to set these environment variables:") - print(" - USAGE_LOGGING=true") - print(" - CACHING=true (or ensure cache backend is configured)") - print(" - CACHE_BACKEND=redis (or fs)") - print(" - CACHE_HOST=localhost") - print(" - CACHE_PORT=6379") - - # Run the example - asyncio.run(run_example()) From e17ca5ac595747408dd720936a2d4e3780d1c437 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 17:35:06 +0100 Subject: [PATCH 19/41] feat: adds logging to memify endpoint --- cognee/api/v1/memify/routers/get_memify_router.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cognee/api/v1/memify/routers/get_memify_router.py b/cognee/api/v1/memify/routers/get_memify_router.py index c63e4a394..0e54d7508 100644 --- a/cognee/api/v1/memify/routers/get_memify_router.py +++ b/cognee/api/v1/memify/routers/get_memify_router.py @@ -12,6 +12,7 @@ from cognee.modules.users.methods import get_authenticated_user from cognee.shared.utils import send_telemetry from cognee.modules.pipelines.models import PipelineRunErrored from cognee.shared.logging_utils import get_logger +from cognee.shared.usage_logger import log_usage from cognee import __version__ as cognee_version logger = get_logger() @@ -35,6 +36,7 @@ def get_memify_router() -> APIRouter: router = APIRouter() @router.post("", response_model=dict) + @log_usage(function_name="POST /v1/memify", log_type="api_endpoint") async def memify(payload: MemifyPayloadDTO, user: User = Depends(get_authenticated_user)): """ Enrichment pipeline in Cognee, can work with already built graphs. If no data is provided existing knowledge graph will be used as data, From abb45c65d7f13719bc1cfba49dfd65dd34c81511 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Thu, 15 Jan 2026 18:03:47 +0100 Subject: [PATCH 20/41] feat: adding and fixing mcp tool logging --- cognee-mcp/src/server.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/cognee-mcp/src/server.py b/cognee-mcp/src/server.py index c02de06c8..5a0515436 100755 --- a/cognee-mcp/src/server.py +++ b/cognee-mcp/src/server.py @@ -8,6 +8,7 @@ from pathlib import Path from typing import Optional from cognee.shared.logging_utils import get_logger, setup_logging, get_log_file_location +from cognee.shared.usage_logger import log_usage import importlib.util from contextlib import redirect_stdout import mcp.types as types @@ -91,6 +92,7 @@ async def health_check(request): @mcp.tool() +@log_usage(function_name="MCP cognify", log_type="mcp_tool") async def cognify( data: str, graph_model_file: str = None, graph_model_name: str = None, custom_prompt: str = None ) -> list: @@ -257,6 +259,7 @@ async def cognify( @mcp.tool( name="save_interaction", description="Logs user-agent interactions and query-answer pairs" ) +@log_usage(function_name="MCP save_interaction", log_type="mcp_tool") async def save_interaction(data: str) -> list: """ Transform and save a user-agent interaction into structured knowledge. @@ -316,6 +319,7 @@ async def save_interaction(data: str) -> list: @mcp.tool() +@log_usage(function_name="MCP search", log_type="mcp_tool") async def search(search_query: str, search_type: str, top_k: int = 10) -> list: """ Search and query the knowledge graph for insights, information, and connections. @@ -496,6 +500,7 @@ async def search(search_query: str, search_type: str, top_k: int = 10) -> list: @mcp.tool() +@log_usage(function_name="MCP list_data", log_type="mcp_tool") async def list_data(dataset_id: str = None) -> list: """ List all datasets and their data items with IDs for deletion operations. @@ -624,6 +629,7 @@ async def list_data(dataset_id: str = None) -> list: @mcp.tool() +@log_usage(function_name="MCP delete", log_type="mcp_tool") async def delete(data_id: str, dataset_id: str, mode: str = "soft") -> list: """ Delete specific data from a dataset in the Cognee knowledge graph. @@ -703,6 +709,7 @@ async def delete(data_id: str, dataset_id: str, mode: str = "soft") -> list: @mcp.tool() +@log_usage(function_name="MCP prune", log_type="mcp_tool") async def prune(): """ Reset the Cognee knowledge graph by removing all stored information. @@ -739,6 +746,7 @@ async def prune(): @mcp.tool() +@log_usage(function_name="MCP cognify_status", log_type="mcp_tool") async def cognify_status(): """ Get the current status of the cognify pipeline. From 2ad8bcf6e984dd9f873ab9e640b2fadcc16a9395 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 12:59:45 +0100 Subject: [PATCH 21/41] feat: adds unit test for usage logging --- cognee/tests/unit/shared/test_usage_logger.py | 216 ++++++++++++++++++ 1 file changed, 216 insertions(+) create mode 100644 cognee/tests/unit/shared/test_usage_logger.py diff --git a/cognee/tests/unit/shared/test_usage_logger.py b/cognee/tests/unit/shared/test_usage_logger.py new file mode 100644 index 000000000..54b49454c --- /dev/null +++ b/cognee/tests/unit/shared/test_usage_logger.py @@ -0,0 +1,216 @@ +"""Unit tests for usage logger core functions.""" + +import pytest +from datetime import datetime, timezone +from uuid import UUID +from types import SimpleNamespace + +from cognee.shared.usage_logger import ( + _sanitize_value, + _sanitize_dict_key, + _get_param_names, + _get_param_defaults, + _extract_user_id, + _extract_parameters, + log_usage, +) +from cognee.shared.exceptions import UsageLoggerError + + +class TestSanitizeValue: + """Test _sanitize_value function.""" + + @pytest.mark.parametrize("value,expected", [ + (None, None), + ("string", "string"), + (42, 42), + (3.14, 3.14), + (True, True), + (False, False), + ]) + def test_basic_types(self, value, expected): + assert _sanitize_value(value) == expected + + def test_uuid_and_datetime(self): + """Test UUID and datetime serialization.""" + uuid_val = UUID("123e4567-e89b-12d3-a456-426614174000") + dt = datetime(2024, 1, 15, 12, 30, 45, tzinfo=timezone.utc) + + assert _sanitize_value(uuid_val) == "123e4567-e89b-12d3-a456-426614174000" + assert _sanitize_value(dt) == "2024-01-15T12:30:45+00:00" + + def test_collections(self): + """Test list, tuple, and dict serialization.""" + assert _sanitize_value([1, "string", UUID("123e4567-e89b-12d3-a456-426614174000"), None]) == [ + 1, "string", "123e4567-e89b-12d3-a456-426614174000", None + ] + assert _sanitize_value((1, "string", True)) == [1, "string", True] + assert _sanitize_value({"key": UUID("123e4567-e89b-12d3-a456-426614174000")}) == { + "key": "123e4567-e89b-12d3-a456-426614174000" + } + assert _sanitize_value([]) == [] + assert _sanitize_value({}) == {} + + def test_nested_and_complex(self): + """Test nested structures and non-serializable types.""" + # Nested structure + nested = {"level1": {"level2": {"level3": [1, 2, {"nested": "value"}]}}} + assert _sanitize_value(nested)["level1"]["level2"]["level3"][2]["nested"] == "value" + + # Non-serializable + class CustomObject: + def __str__(self): + return "" + + result = _sanitize_value(CustomObject()) + assert isinstance(result, str) + assert "" + + result = _sanitize_dict_key(BadKey()) + assert isinstance(result, str) + assert " Date: Fri, 16 Jan 2026 13:01:46 +0100 Subject: [PATCH 22/41] ruff ruff --- cognee/tests/unit/shared/test_usage_logger.py | 113 +++++++++++------- 1 file changed, 69 insertions(+), 44 deletions(-) diff --git a/cognee/tests/unit/shared/test_usage_logger.py b/cognee/tests/unit/shared/test_usage_logger.py index 54b49454c..fe4ebb15c 100644 --- a/cognee/tests/unit/shared/test_usage_logger.py +++ b/cognee/tests/unit/shared/test_usage_logger.py @@ -20,14 +20,17 @@ from cognee.shared.exceptions import UsageLoggerError class TestSanitizeValue: """Test _sanitize_value function.""" - @pytest.mark.parametrize("value,expected", [ - (None, None), - ("string", "string"), - (42, 42), - (3.14, 3.14), - (True, True), - (False, False), - ]) + @pytest.mark.parametrize( + "value,expected", + [ + (None, None), + ("string", "string"), + (42, 42), + (3.14, 3.14), + (True, True), + (False, False), + ], + ) def test_basic_types(self, value, expected): assert _sanitize_value(value) == expected @@ -35,15 +38,15 @@ class TestSanitizeValue: """Test UUID and datetime serialization.""" uuid_val = UUID("123e4567-e89b-12d3-a456-426614174000") dt = datetime(2024, 1, 15, 12, 30, 45, tzinfo=timezone.utc) - + assert _sanitize_value(uuid_val) == "123e4567-e89b-12d3-a456-426614174000" assert _sanitize_value(dt) == "2024-01-15T12:30:45+00:00" def test_collections(self): """Test list, tuple, and dict serialization.""" - assert _sanitize_value([1, "string", UUID("123e4567-e89b-12d3-a456-426614174000"), None]) == [ - 1, "string", "123e4567-e89b-12d3-a456-426614174000", None - ] + assert _sanitize_value( + [1, "string", UUID("123e4567-e89b-12d3-a456-426614174000"), None] + ) == [1, "string", "123e4567-e89b-12d3-a456-426614174000", None] assert _sanitize_value((1, "string", True)) == [1, "string", True] assert _sanitize_value({"key": UUID("123e4567-e89b-12d3-a456-426614174000")}) == { "key": "123e4567-e89b-12d3-a456-426614174000" @@ -56,12 +59,12 @@ class TestSanitizeValue: # Nested structure nested = {"level1": {"level2": {"level3": [1, 2, {"nested": "value"}]}}} assert _sanitize_value(nested)["level1"]["level2"]["level3"][2]["nested"] == "value" - + # Non-serializable class CustomObject: def __str__(self): return "" - + result = _sanitize_value(CustomObject()) assert isinstance(result, str) assert "" - + result = _sanitize_dict_key(BadKey()) assert isinstance(result, str) assert " Date: Fri, 16 Jan 2026 16:11:53 +0100 Subject: [PATCH 23/41] feat: adds integration test for usage logger --- .../shared/test_usage_logger_integration.py | 348 ++++++++++++++++++ 1 file changed, 348 insertions(+) create mode 100644 cognee/tests/integration/shared/test_usage_logger_integration.py diff --git a/cognee/tests/integration/shared/test_usage_logger_integration.py b/cognee/tests/integration/shared/test_usage_logger_integration.py new file mode 100644 index 000000000..05d19c6fa --- /dev/null +++ b/cognee/tests/integration/shared/test_usage_logger_integration.py @@ -0,0 +1,348 @@ +import os +import pytest +import asyncio +from datetime import datetime, timezone +from types import SimpleNamespace +from uuid import UUID +from cognee.shared.usage_logger import log_usage +from cognee.infrastructure.databases.cache.config import get_cache_config +from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine, create_cache_engine + + +@pytest.fixture +def usage_logging_config(): + """Fixture to enable usage logging via environment variables.""" + original_env = os.environ.copy() + os.environ["USAGE_LOGGING"] = "true" + os.environ["CACHE_BACKEND"] = "redis" + os.environ["CACHE_HOST"] = "localhost" + os.environ["CACHE_PORT"] = "6379" + get_cache_config.cache_clear() + create_cache_engine.cache_clear() + yield + os.environ.clear() + os.environ.update(original_env) + get_cache_config.cache_clear() + create_cache_engine.cache_clear() + + +@pytest.fixture +def usage_logging_disabled(): + """Fixture to disable usage logging via environment variables.""" + original_env = os.environ.copy() + os.environ["USAGE_LOGGING"] = "false" + os.environ["CACHE_BACKEND"] = "redis" + get_cache_config.cache_clear() + create_cache_engine.cache_clear() + yield + os.environ.clear() + os.environ.update(original_env) + get_cache_config.cache_clear() + create_cache_engine.cache_clear() + + +@pytest.fixture +def redis_adapter(): + """Real RedisAdapter instance for testing.""" + from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter + + try: + adapter = RedisAdapter(host="localhost", port=6379, log_key="test_usage_logs") + yield adapter + except Exception as e: + pytest.skip(f"Redis not available: {e}") + + +@pytest.fixture +def test_user(): + """Test user object.""" + return SimpleNamespace(id="test-user-123") + + +class TestDecoratorBehavior: + """Test decorator behavior with real components.""" + + @pytest.mark.asyncio + async def test_decorator_skips_when_disabled(self, usage_logging_disabled): + """Test decorator skips logging when usage_logging=False.""" + call_count = 0 + + @log_usage(function_name="test_func", log_type="test") + async def test_func(): + nonlocal call_count + call_count += 1 + return "result" + + assert await test_func() == "result" + assert call_count == 1 + + @pytest.mark.asyncio + async def test_decorator_basic_logging( + self, usage_logging_config, redis_adapter, test_user + ): + """Test decorator logs to Redis and handles various scenarios.""" + from unittest.mock import patch + + @log_usage(function_name="test_func", log_type="test") + async def test_func(param1: str, param2: int = 42, user=None): + await asyncio.sleep(0.01) + return {"result": f"{param1}_{param2}"} + + with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: + mock_get.return_value = redis_adapter + + # Test basic logging + result = await test_func("value1", user=test_user) + assert result == {"result": "value1_42"} + + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert len(logs) > 0 + log = logs[0] + assert log["function_name"] == "test_func" + assert log["type"] == "test" + assert log["user_id"] == "test-user-123" + assert log["parameters"]["param1"] == "value1" + assert log["parameters"]["param2"] == 42 + assert log["success"] is True + + # Test log entry structure + required_fields = [ + "timestamp", "type", "function_name", "user_id", "parameters", + "result", "success", "error", "duration_ms", "start_time", "end_time", "metadata" + ] + for field in required_fields: + assert field in log + assert "cognee_version" in log["metadata"] + assert "environment" in log["metadata"] + + @pytest.mark.asyncio + async def test_decorator_handles_cache_engine_none(self, usage_logging_config): + """Test decorator handles gracefully when cache engine is None.""" + from unittest.mock import patch + + @log_usage(function_name="test_func", log_type="test") + async def test_func(): + return "result" + + with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: + mock_get.return_value = None + assert await test_func() == "result" + + @pytest.mark.asyncio + async def test_success_and_failure_logging( + self, usage_logging_config, redis_adapter, test_user + ): + """Test successful and failed execution logging.""" + from unittest.mock import patch + + @log_usage(function_name="success_test", log_type="test") + async def success_func(data: str, user=None): + await asyncio.sleep(0.01) + return {"status": "success", "data": data} + + @log_usage(function_name="fail_test", log_type="test") + async def fail_func(user=None): + raise ValueError("Test error") + + with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: + mock_get.return_value = redis_adapter + + # Test success + result = await success_func("test_data", user=test_user) + assert result == {"status": "success", "data": "test_data"} + + logs = await redis_adapter.get_usage_logs("test-user-123", limit=2) + success_log = logs[0] + assert success_log["success"] is True + assert success_log["error"] is None + assert success_log["result"]["status"] == "success" + assert success_log["duration_ms"] > 0 + + # Test failure + with pytest.raises(ValueError, match="Test error"): + await fail_func(user=test_user) + + logs = await redis_adapter.get_usage_logs("test-user-123", limit=2) + fail_log = logs[0] + assert fail_log["success"] is False + assert fail_log["error"] == "Test error" + + @pytest.mark.asyncio + async def test_timing_and_multiple_calls( + self, usage_logging_config, redis_adapter, test_user + ): + """Test timing accuracy and multiple consecutive calls.""" + from unittest.mock import patch + + @log_usage(function_name="timing_test", log_type="test") + async def timing_func(user=None): + await asyncio.sleep(0.1) + return "done" + + @log_usage(function_name="multi_test", log_type="test") + async def multi_func(call_num: int, user=None): + return {"call": call_num} + + with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: + mock_get.return_value = redis_adapter + + # Test timing + await timing_func(user=test_user) + logs = await redis_adapter.get_usage_logs("test-user-123", limit=1) + assert 50 <= logs[0]["duration_ms"] <= 200 + + # Test multiple calls + for i in range(3): + await multi_func(i, user=test_user) + + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert len(logs) >= 3 + call_nums = [log["parameters"]["call_num"] for log in logs[:3]] + assert set(call_nums) == {0, 1, 2} + + +class TestRealRedisIntegration: + """Test real Redis integration.""" + + @pytest.mark.asyncio + async def test_redis_storage_and_retrieval( + self, usage_logging_config, redis_adapter, test_user + ): + """Test logs are stored in Redis and can be retrieved with correct order and limits.""" + from unittest.mock import patch + + @log_usage(function_name="redis_test", log_type="test") + async def redis_func(data: str, user=None): + return {"processed": data} + + @log_usage(function_name="order_test", log_type="test") + async def order_func(num: int, user=None): + return {"num": num} + + with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: + mock_get.return_value = redis_adapter + + # Test storage + await redis_func("test_data", user=test_user) + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert len(logs) > 0 + assert logs[0]["function_name"] == "redis_test" + assert logs[0]["parameters"]["data"] == "test_data" + + # Test order (most recent first) + for i in range(3): + await order_func(i, user=test_user) + await asyncio.sleep(0.01) + + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert logs[0]["parameters"]["num"] == 2 + assert logs[1]["parameters"]["num"] == 1 + assert logs[2]["parameters"]["num"] == 0 + + # Test limit parameter + logs = await redis_adapter.get_usage_logs("test-user-123", limit=2) + assert len(logs) == 2 + + @pytest.mark.asyncio + async def test_ttl_set_correctly( + self, usage_logging_config, redis_adapter, test_user + ): + """Test that TTL is set correctly on Redis keys.""" + from unittest.mock import patch + + @log_usage(function_name="ttl_test", log_type="test") + async def ttl_func(user=None): + return "result" + + with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: + mock_get.return_value = redis_adapter + + await ttl_func(user=test_user) + + key = f"test_usage_logs:test-user-123" + ttl = await redis_adapter.async_redis.ttl(key) + assert ttl > 0 + assert ttl <= 604800 + + +class TestEdgeCases: + """Test edge cases in integration tests.""" + + @pytest.mark.asyncio + async def test_edge_cases( + self, usage_logging_config, redis_adapter, test_user + ): + """Test various edge cases: no params, defaults, complex structures, exceptions, None, circular refs.""" + from unittest.mock import patch + + @log_usage(function_name="no_params", log_type="test") + async def no_params_func(user=None): + return "result" + + @log_usage(function_name="defaults_only", log_type="test") + async def defaults_only_func(param1: str = "default1", param2: int = 42, user=None): + return {"param1": param1, "param2": param2} + + @log_usage(function_name="complex_test", log_type="test") + async def complex_func(user=None): + return { + "nested": { + "list": [1, 2, 3], + "uuid": UUID("123e4567-e89b-12d3-a456-426614174000"), + "datetime": datetime(2024, 1, 15, tzinfo=timezone.utc), + } + } + + @log_usage(function_name="exception_test", log_type="test") + async def exception_func(user=None): + raise RuntimeError("Test exception") + + @log_usage(function_name="none_test", log_type="test") + async def none_func(user=None): + return None + + @log_usage(function_name="circular_test", log_type="test") + async def circular_func(user=None): + a = [] + a.append(a) + return a + + with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: + mock_get.return_value = redis_adapter + + # No parameters + await no_params_func(user=test_user) + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert logs[0]["parameters"] == {} + + # Default parameters + await defaults_only_func(user=test_user) + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert logs[0]["parameters"]["param1"] == "default1" + assert logs[0]["parameters"]["param2"] == 42 + + # Complex nested structures + result = await complex_func(user=test_user) + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert "nested" in logs[0]["result"] + assert isinstance(logs[0]["result"]["nested"]["uuid"], str) + assert isinstance(logs[0]["result"]["nested"]["datetime"], str) + + # Exception handling + with pytest.raises(RuntimeError): + await exception_func(user=test_user) + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert logs[0]["success"] is False + assert "Test exception" in logs[0]["error"] + + # None return value + result = await none_func(user=test_user) + assert result is None + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert logs[0]["result"] is None + + # Circular reference + result = await circular_func(user=test_user) + assert isinstance(result, list) + logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) + assert "result" in logs[0] From 3aadb91a6ffc5b6f07008789c2f564b792fbcc82 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 16:13:22 +0100 Subject: [PATCH 24/41] ruff ruff --- .../shared/test_usage_logger_integration.py | 35 +++++++++++-------- 1 file changed, 20 insertions(+), 15 deletions(-) diff --git a/cognee/tests/integration/shared/test_usage_logger_integration.py b/cognee/tests/integration/shared/test_usage_logger_integration.py index 05d19c6fa..66b6fd2a6 100644 --- a/cognee/tests/integration/shared/test_usage_logger_integration.py +++ b/cognee/tests/integration/shared/test_usage_logger_integration.py @@ -6,7 +6,10 @@ from types import SimpleNamespace from uuid import UUID from cognee.shared.usage_logger import log_usage from cognee.infrastructure.databases.cache.config import get_cache_config -from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine, create_cache_engine +from cognee.infrastructure.databases.cache.get_cache_engine import ( + get_cache_engine, + create_cache_engine, +) @pytest.fixture @@ -77,9 +80,7 @@ class TestDecoratorBehavior: assert call_count == 1 @pytest.mark.asyncio - async def test_decorator_basic_logging( - self, usage_logging_config, redis_adapter, test_user - ): + async def test_decorator_basic_logging(self, usage_logging_config, redis_adapter, test_user): """Test decorator logs to Redis and handles various scenarios.""" from unittest.mock import patch @@ -107,8 +108,18 @@ class TestDecoratorBehavior: # Test log entry structure required_fields = [ - "timestamp", "type", "function_name", "user_id", "parameters", - "result", "success", "error", "duration_ms", "start_time", "end_time", "metadata" + "timestamp", + "type", + "function_name", + "user_id", + "parameters", + "result", + "success", + "error", + "duration_ms", + "start_time", + "end_time", + "metadata", ] for field in required_fields: assert field in log @@ -168,9 +179,7 @@ class TestDecoratorBehavior: assert fail_log["error"] == "Test error" @pytest.mark.asyncio - async def test_timing_and_multiple_calls( - self, usage_logging_config, redis_adapter, test_user - ): + async def test_timing_and_multiple_calls(self, usage_logging_config, redis_adapter, test_user): """Test timing accuracy and multiple consecutive calls.""" from unittest.mock import patch @@ -244,9 +253,7 @@ class TestRealRedisIntegration: assert len(logs) == 2 @pytest.mark.asyncio - async def test_ttl_set_correctly( - self, usage_logging_config, redis_adapter, test_user - ): + async def test_ttl_set_correctly(self, usage_logging_config, redis_adapter, test_user): """Test that TTL is set correctly on Redis keys.""" from unittest.mock import patch @@ -269,9 +276,7 @@ class TestEdgeCases: """Test edge cases in integration tests.""" @pytest.mark.asyncio - async def test_edge_cases( - self, usage_logging_config, redis_adapter, test_user - ): + async def test_edge_cases(self, usage_logging_config, redis_adapter, test_user): """Test various edge cases: no params, defaults, complex structures, exceptions, None, circular refs.""" from unittest.mock import patch From 4e1e3dcfb9943fc328415fe861fcb52d256c3383 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 16:18:18 +0100 Subject: [PATCH 25/41] chore: makes integration test a bit cleaner --- .../shared/test_usage_logger_integration.py | 190 +++++------------- 1 file changed, 46 insertions(+), 144 deletions(-) diff --git a/cognee/tests/integration/shared/test_usage_logger_integration.py b/cognee/tests/integration/shared/test_usage_logger_integration.py index 66b6fd2a6..6bdd58e3c 100644 --- a/cognee/tests/integration/shared/test_usage_logger_integration.py +++ b/cognee/tests/integration/shared/test_usage_logger_integration.py @@ -1,9 +1,13 @@ +"""Integration tests for usage logger with real Redis components.""" + import os import pytest import asyncio from datetime import datetime, timezone from types import SimpleNamespace from uuid import UUID +from unittest.mock import patch + from cognee.shared.usage_logger import log_usage from cognee.infrastructure.databases.cache.config import get_cache_config from cognee.infrastructure.databases.cache.get_cache_engine import ( @@ -50,8 +54,7 @@ def redis_adapter(): from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter try: - adapter = RedisAdapter(host="localhost", port=6379, log_key="test_usage_logs") - yield adapter + yield RedisAdapter(host="localhost", port=6379, log_key="test_usage_logs") except Exception as e: pytest.skip(f"Redis not available: {e}") @@ -66,8 +69,11 @@ class TestDecoratorBehavior: """Test decorator behavior with real components.""" @pytest.mark.asyncio - async def test_decorator_skips_when_disabled(self, usage_logging_disabled): - """Test decorator skips logging when usage_logging=False.""" + async def test_decorator_configuration( + self, usage_logging_disabled, usage_logging_config, redis_adapter + ): + """Test decorator skips when disabled and logs when enabled.""" + # Test disabled call_count = 0 @log_usage(function_name="test_func", log_type="test") @@ -79,10 +85,14 @@ class TestDecoratorBehavior: assert await test_func() == "result" assert call_count == 1 + # Test enabled with cache engine None + with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: + mock_get.return_value = None + assert await test_func() == "result" + @pytest.mark.asyncio - async def test_decorator_basic_logging(self, usage_logging_config, redis_adapter, test_user): - """Test decorator logs to Redis and handles various scenarios.""" - from unittest.mock import patch + async def test_decorator_logging(self, usage_logging_config, redis_adapter, test_user): + """Test decorator logs to Redis with correct structure.""" @log_usage(function_name="test_func", log_type="test") async def test_func(param1: str, param2: int = 42, user=None): @@ -92,12 +102,10 @@ class TestDecoratorBehavior: with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: mock_get.return_value = redis_adapter - # Test basic logging result = await test_func("value1", user=test_user) assert result == {"result": "value1_42"} logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) - assert len(logs) > 0 log = logs[0] assert log["function_name"] == "test_func" assert log["type"] == "test" @@ -105,88 +113,23 @@ class TestDecoratorBehavior: assert log["parameters"]["param1"] == "value1" assert log["parameters"]["param2"] == 42 assert log["success"] is True - - # Test log entry structure - required_fields = [ - "timestamp", - "type", - "function_name", - "user_id", - "parameters", - "result", - "success", - "error", - "duration_ms", - "start_time", - "end_time", - "metadata", - ] - for field in required_fields: - assert field in log + assert all( + field in log + for field in [ + "timestamp", + "result", + "error", + "duration_ms", + "start_time", + "end_time", + "metadata", + ] + ) assert "cognee_version" in log["metadata"] - assert "environment" in log["metadata"] @pytest.mark.asyncio - async def test_decorator_handles_cache_engine_none(self, usage_logging_config): - """Test decorator handles gracefully when cache engine is None.""" - from unittest.mock import patch - - @log_usage(function_name="test_func", log_type="test") - async def test_func(): - return "result" - - with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: - mock_get.return_value = None - assert await test_func() == "result" - - @pytest.mark.asyncio - async def test_success_and_failure_logging( - self, usage_logging_config, redis_adapter, test_user - ): - """Test successful and failed execution logging.""" - from unittest.mock import patch - - @log_usage(function_name="success_test", log_type="test") - async def success_func(data: str, user=None): - await asyncio.sleep(0.01) - return {"status": "success", "data": data} - - @log_usage(function_name="fail_test", log_type="test") - async def fail_func(user=None): - raise ValueError("Test error") - - with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: - mock_get.return_value = redis_adapter - - # Test success - result = await success_func("test_data", user=test_user) - assert result == {"status": "success", "data": "test_data"} - - logs = await redis_adapter.get_usage_logs("test-user-123", limit=2) - success_log = logs[0] - assert success_log["success"] is True - assert success_log["error"] is None - assert success_log["result"]["status"] == "success" - assert success_log["duration_ms"] > 0 - - # Test failure - with pytest.raises(ValueError, match="Test error"): - await fail_func(user=test_user) - - logs = await redis_adapter.get_usage_logs("test-user-123", limit=2) - fail_log = logs[0] - assert fail_log["success"] is False - assert fail_log["error"] == "Test error" - - @pytest.mark.asyncio - async def test_timing_and_multiple_calls(self, usage_logging_config, redis_adapter, test_user): - """Test timing accuracy and multiple consecutive calls.""" - from unittest.mock import patch - - @log_usage(function_name="timing_test", log_type="test") - async def timing_func(user=None): - await asyncio.sleep(0.1) - return "done" + async def test_multiple_calls(self, usage_logging_config, redis_adapter, test_user): + """Test multiple consecutive calls are all logged.""" @log_usage(function_name="multi_test", log_type="test") async def multi_func(call_num: int, user=None): @@ -195,30 +138,23 @@ class TestDecoratorBehavior: with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: mock_get.return_value = redis_adapter - # Test timing - await timing_func(user=test_user) - logs = await redis_adapter.get_usage_logs("test-user-123", limit=1) - assert 50 <= logs[0]["duration_ms"] <= 200 - - # Test multiple calls for i in range(3): await multi_func(i, user=test_user) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) assert len(logs) >= 3 - call_nums = [log["parameters"]["call_num"] for log in logs[:3]] - assert set(call_nums) == {0, 1, 2} + call_nums = {log["parameters"]["call_num"] for log in logs[:3]} + assert call_nums == {0, 1, 2} class TestRealRedisIntegration: """Test real Redis integration.""" @pytest.mark.asyncio - async def test_redis_storage_and_retrieval( + async def test_redis_storage_retrieval_and_ttl( self, usage_logging_config, redis_adapter, test_user ): - """Test logs are stored in Redis and can be retrieved with correct order and limits.""" - from unittest.mock import patch + """Test logs are stored, retrieved with correct order/limits, and TTL is set.""" @log_usage(function_name="redis_test", log_type="test") async def redis_func(data: str, user=None): @@ -231,45 +167,26 @@ class TestRealRedisIntegration: with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: mock_get.return_value = redis_adapter - # Test storage + # Storage await redis_func("test_data", user=test_user) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) - assert len(logs) > 0 assert logs[0]["function_name"] == "redis_test" assert logs[0]["parameters"]["data"] == "test_data" - # Test order (most recent first) + # Order (most recent first) for i in range(3): await order_func(i, user=test_user) await asyncio.sleep(0.01) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) - assert logs[0]["parameters"]["num"] == 2 - assert logs[1]["parameters"]["num"] == 1 - assert logs[2]["parameters"]["num"] == 0 + assert [log["parameters"]["num"] for log in logs[:3]] == [2, 1, 0] - # Test limit parameter - logs = await redis_adapter.get_usage_logs("test-user-123", limit=2) - assert len(logs) == 2 + # Limit + assert len(await redis_adapter.get_usage_logs("test-user-123", limit=2)) == 2 - @pytest.mark.asyncio - async def test_ttl_set_correctly(self, usage_logging_config, redis_adapter, test_user): - """Test that TTL is set correctly on Redis keys.""" - from unittest.mock import patch - - @log_usage(function_name="ttl_test", log_type="test") - async def ttl_func(user=None): - return "result" - - with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: - mock_get.return_value = redis_adapter - - await ttl_func(user=test_user) - - key = f"test_usage_logs:test-user-123" - ttl = await redis_adapter.async_redis.ttl(key) - assert ttl > 0 - assert ttl <= 604800 + # TTL + ttl = await redis_adapter.async_redis.ttl("test_usage_logs:test-user-123") + assert 0 < ttl <= 604800 class TestEdgeCases: @@ -277,8 +194,7 @@ class TestEdgeCases: @pytest.mark.asyncio async def test_edge_cases(self, usage_logging_config, redis_adapter, test_user): - """Test various edge cases: no params, defaults, complex structures, exceptions, None, circular refs.""" - from unittest.mock import patch + """Test no params, defaults, complex structures, exceptions, None, circular refs.""" @log_usage(function_name="no_params", log_type="test") async def no_params_func(user=None): @@ -306,12 +222,6 @@ class TestEdgeCases: async def none_func(user=None): return None - @log_usage(function_name="circular_test", log_type="test") - async def circular_func(user=None): - a = [] - a.append(a) - return a - with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: mock_get.return_value = redis_adapter @@ -327,9 +237,8 @@ class TestEdgeCases: assert logs[0]["parameters"]["param2"] == 42 # Complex nested structures - result = await complex_func(user=test_user) + await complex_func(user=test_user) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) - assert "nested" in logs[0]["result"] assert isinstance(logs[0]["result"]["nested"]["uuid"], str) assert isinstance(logs[0]["result"]["nested"]["datetime"], str) @@ -341,13 +250,6 @@ class TestEdgeCases: assert "Test exception" in logs[0]["error"] # None return value - result = await none_func(user=test_user) - assert result is None + assert await none_func(user=test_user) is None logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) assert logs[0]["result"] is None - - # Circular reference - result = await circular_func(user=test_user) - assert isinstance(result, list) - logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) - assert "result" in logs[0] From 01c851cf80a779a7caa22461bc47809afd519740 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:15:17 +0100 Subject: [PATCH 26/41] Adds usage logger e2e test --- .github/workflows/e2e_tests.yml | 43 ++++++ cognee/tests/test_usage_logger_e2e.py | 191 ++++++++++++++++++++++++++ 2 files changed, 234 insertions(+) create mode 100644 cognee/tests/test_usage_logger_e2e.py diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index badf88e71..a5e981b34 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -659,3 +659,46 @@ jobs: EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} run: uv run python ./cognee/tests/test_pipeline_cache.py + + run_usage_logger_test: + name: Usage logger test (API/MCP) + runs-on: ubuntu-latest + defaults: + run: + shell: bash + services: + redis: + image: redis:7 + ports: + - 6379:6379 + options: >- + --health-cmd "redis-cli ping" + --health-interval 5s + --health-timeout 3s + --health-retries 5 + + steps: + - name: Checkout repository + uses: actions/checkout@v4 + + - name: Cognee Setup + uses: ./.github/actions/cognee_setup + with: + python-version: '3.11.x' + extra-dependencies: "redis" + + - name: Run api/tool usage logger + env: + ENV: dev + LLM_MODEL: ${{ secrets.LLM_MODEL }} + LLM_ENDPOINT: ${{ secrets.LLM_ENDPOINT }} + LLM_API_KEY: ${{ secrets.LLM_API_KEY }} + LLM_API_VERSION: ${{ secrets.LLM_API_VERSION }} + EMBEDDING_MODEL: ${{ secrets.EMBEDDING_MODEL }} + EMBEDDING_ENDPOINT: ${{ secrets.EMBEDDING_ENDPOINT }} + EMBEDDING_API_KEY: ${{ secrets.EMBEDDING_API_KEY }} + EMBEDDING_API_VERSION: ${{ secrets.EMBEDDING_API_VERSION }} + GRAPH_DATABASE_PROVIDER: 'kuzu' + USAGE_LOGGING: true + CACHE_BACKEND: 'redis' + run: uv run pytest cognee/tests/test_usage_logger_e2e.py -v --log-level=INFO diff --git a/cognee/tests/test_usage_logger_e2e.py b/cognee/tests/test_usage_logger_e2e.py new file mode 100644 index 000000000..8c78700f6 --- /dev/null +++ b/cognee/tests/test_usage_logger_e2e.py @@ -0,0 +1,191 @@ +import os +import pytest +import pytest_asyncio +import asyncio +from fastapi.testclient import TestClient + +import cognee +from cognee.api.client import app +from cognee.modules.users.methods import get_default_user, get_authenticated_user +from cognee.infrastructure.databases.cache.config import get_cache_config +from cognee.infrastructure.databases.cache.get_cache_engine import create_cache_engine +from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine +from cognee.infrastructure.databases.vector.create_vector_engine import create_vector_engine +from cognee.infrastructure.databases.relational.create_relational_engine import ( + create_relational_engine, +) + + +async def _reset_engines_and_prune(): + """Reset db engine caches and prune data/system.""" + try: + from cognee.infrastructure.databases.vector import get_vector_engine + + vector_engine = get_vector_engine() + if hasattr(vector_engine, "engine") and hasattr(vector_engine.engine, "dispose"): + await vector_engine.engine.dispose(close=True) + except Exception: + pass + + create_graph_engine.cache_clear() + create_vector_engine.cache_clear() + create_relational_engine.cache_clear() + + await cognee.prune.prune_data() + await cognee.prune.prune_system(metadata=True) + + +@pytest.fixture(scope="session") +def event_loop(): + """Use a single asyncio event loop for this test module.""" + loop = asyncio.new_event_loop() + try: + yield loop + finally: + loop.close() + + +@pytest.fixture(scope="session") +def e2e_config(): + """Configure environment for E2E tests.""" + original_env = os.environ.copy() + os.environ["USAGE_LOGGING"] = "true" + os.environ["CACHE_BACKEND"] = "redis" + os.environ["CACHE_HOST"] = "localhost" + os.environ["CACHE_PORT"] = "6379" + get_cache_config.cache_clear() + create_cache_engine.cache_clear() + yield + os.environ.clear() + os.environ.update(original_env) + get_cache_config.cache_clear() + create_cache_engine.cache_clear() + + +@pytest.fixture(scope="session") +def authenticated_client(test_client): + """Override authentication to use default user.""" + + async def override_get_authenticated_user(): + return await get_default_user() + + app.dependency_overrides[get_authenticated_user] = override_get_authenticated_user + yield test_client + app.dependency_overrides.pop(get_authenticated_user, None) + + +@pytest_asyncio.fixture(scope="session") +async def test_data_setup(): + """Set up test data: prune first, then add file and cognify.""" + await _reset_engines_and_prune() + + dataset_name = "test_e2e_dataset" + test_text = "Germany is located in Europe right next to the Netherlands." + + await cognee.add(test_text, dataset_name) + await cognee.cognify([dataset_name]) + + yield dataset_name + + await _reset_engines_and_prune() + + +@pytest_asyncio.fixture +async def mcp_data_setup(): + """Set up test data for MCP tests: prune first, then add file and cognify.""" + await _reset_engines_and_prune() + + dataset_name = "test_mcp_dataset" + test_text = "Germany is located in Europe right next to the Netherlands." + + await cognee.add(test_text, dataset_name) + await cognee.cognify([dataset_name]) + + yield dataset_name + + await _reset_engines_and_prune() + + +@pytest.fixture(scope="session") +def test_client(): + """TestClient instance for API calls.""" + with TestClient(app) as client: + yield client + + +@pytest_asyncio.fixture +async def cache_engine(e2e_config): + """Get cache engine for log verification in test's event loop.""" + create_cache_engine.cache_clear() + from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter + from cognee.infrastructure.databases.cache.config import get_cache_config + + config = get_cache_config() + if not config.usage_logging or config.cache_backend != "redis": + pytest.skip("Redis usage logging not configured") + + engine = RedisAdapter( + host=config.cache_host, + port=config.cache_port, + username=config.cache_username, + password=config.cache_password, + log_key="usage_logs", + ) + return engine + + +@pytest.mark.asyncio +async def test_api_endpoint_logging(e2e_config, authenticated_client, cache_engine): + """Test that API endpoints succeed and log to Redis.""" + user = await get_default_user() + dataset_name = "test_e2e_api_dataset" + + add_response = authenticated_client.post( + "/api/v1/add", + data={"datasetName": dataset_name}, + files=[ + ( + "data", + ( + "test.txt", + b"Germany is located in Europe right next to the Netherlands.", + "text/plain", + ), + ) + ], + ) + assert add_response.status_code in [200, 201], f"Add endpoint failed: {add_response.text}" + + cognify_response = authenticated_client.post( + "/api/v1/cognify", + json={"datasets": [dataset_name], "run_in_background": False}, + ) + assert cognify_response.status_code in [200, 201], ( + f"Cognify endpoint failed: {cognify_response.text}" + ) + + search_response = authenticated_client.post( + "/api/v1/search", + json={"query": "Germany", "search_type": "GRAPH_COMPLETION", "datasets": [dataset_name]}, + ) + assert search_response.status_code == 200, f"Search endpoint failed: {search_response.text}" + + logs = await cache_engine.get_usage_logs(str(user.id), limit=20) + + add_logs = [log for log in logs if log.get("function_name") == "POST /v1/add"] + assert len(add_logs) > 0 + assert add_logs[0]["type"] == "api_endpoint" + assert add_logs[0]["user_id"] == str(user.id) + assert add_logs[0]["success"] is True + + cognify_logs = [log for log in logs if log.get("function_name") == "POST /v1/cognify"] + assert len(cognify_logs) > 0 + assert cognify_logs[0]["type"] == "api_endpoint" + assert cognify_logs[0]["user_id"] == str(user.id) + assert cognify_logs[0]["success"] is True + + search_logs = [log for log in logs if log.get("function_name") == "POST /v1/search"] + assert len(search_logs) > 0 + assert search_logs[0]["type"] == "api_endpoint" + assert search_logs[0]["user_id"] == str(user.id) + assert search_logs[0]["success"] is True From cd841363a69b8cfd27746b168de17c2093a087fe Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:28:38 +0100 Subject: [PATCH 27/41] feat: adds mcp tool usage e2e test --- cognee/tests/test_usage_logger_e2e.py | 93 +++++++++++++++++++++++++++ 1 file changed, 93 insertions(+) diff --git a/cognee/tests/test_usage_logger_e2e.py b/cognee/tests/test_usage_logger_e2e.py index 8c78700f6..897097183 100644 --- a/cognee/tests/test_usage_logger_e2e.py +++ b/cognee/tests/test_usage_logger_e2e.py @@ -189,3 +189,96 @@ async def test_api_endpoint_logging(e2e_config, authenticated_client, cache_engi assert search_logs[0]["type"] == "api_endpoint" assert search_logs[0]["user_id"] == str(user.id) assert search_logs[0]["success"] is True + + +@pytest.mark.asyncio +async def test_mcp_tool_logging(e2e_config, cache_engine): + """Test that MCP tools succeed and log to Redis.""" + import sys + import importlib.util + from pathlib import Path + + await _reset_engines_and_prune() + + repo_root = Path(__file__).parent.parent.parent + mcp_src_path = repo_root / "cognee-mcp" / "src" + mcp_server_path = mcp_src_path / "server.py" + + if not mcp_server_path.exists(): + pytest.skip(f"MCP server not found at {mcp_server_path}") + + if str(mcp_src_path) not in sys.path: + sys.path.insert(0, str(mcp_src_path)) + + spec = importlib.util.spec_from_file_location("mcp_server_module", mcp_server_path) + mcp_server_module = importlib.util.module_from_spec(spec) + + import os + + original_cwd = os.getcwd() + try: + os.chdir(str(mcp_src_path)) + spec.loader.exec_module(mcp_server_module) + finally: + os.chdir(original_cwd) + + if mcp_server_module.cognee_client is None: + cognee_client_path = mcp_src_path / "cognee_client.py" + if cognee_client_path.exists(): + spec_client = importlib.util.spec_from_file_location( + "cognee_client", cognee_client_path + ) + cognee_client_module = importlib.util.module_from_spec(spec_client) + spec_client.loader.exec_module(cognee_client_module) + CogneeClient = cognee_client_module.CogneeClient + mcp_server_module.cognee_client = CogneeClient() + else: + pytest.skip(f"CogneeClient not found at {cognee_client_path}") + + test_text = "Germany is located in Europe right next to the Netherlands." + await mcp_server_module.cognify(data=test_text) + await asyncio.sleep(30.0) + + list_result = await mcp_server_module.list_data() + assert list_result is not None, "List data should return results" + + search_result = await mcp_server_module.search( + search_query="Germany", search_type="GRAPH_COMPLETION", top_k=5 + ) + assert search_result is not None, "Search should return results" + + interaction_data = "User: What is Germany?\nAgent: Germany is a country in Europe." + await mcp_server_module.save_interaction(data=interaction_data) + await asyncio.sleep(30.0) + + status_result = await mcp_server_module.cognify_status() + assert status_result is not None, "Cognify status should return results" + + await mcp_server_module.prune() + await asyncio.sleep(0.5) + + logs = await cache_engine.get_usage_logs("unknown", limit=50) + mcp_logs = [log for log in logs if log.get("type") == "mcp_tool"] + assert len(mcp_logs) > 0, ( + f"Should have MCP tool logs with user_id='unknown'. Found logs: {[log.get('function_name') for log in logs[:5]]}" + ) + assert len(mcp_logs) == 6 + function_names = [log.get("function_name") for log in mcp_logs] + expected_tools = [ + "MCP cognify", + "MCP list_data", + "MCP search", + "MCP save_interaction", + "MCP cognify_status", + "MCP prune", + ] + + for expected_tool in expected_tools: + assert expected_tool in function_names, ( + f"Should have {expected_tool} log. Found: {function_names}" + ) + + for log in mcp_logs: + assert log["type"] == "mcp_tool" + assert log["user_id"] == "unknown" + assert log["success"] is True From 9d373e76571069055ec2dcb8c9d1edc4acc17483 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:41:40 +0100 Subject: [PATCH 28/41] chore updates mcp deps for CI --- .github/workflows/e2e_tests.yml | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index a5e981b34..438a88172 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -687,6 +687,10 @@ jobs: python-version: '3.11.x' extra-dependencies: "redis" + - name: Sync cognee-mcp dependencies + working-directory: cognee-mcp + run: uv sync --no-dev + - name: Run api/tool usage logger env: ENV: dev From 61157725d15eb3b6028a2ac5a468d38b045d28af Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:48:48 +0100 Subject: [PATCH 29/41] Update e2e_tests.yml --- .github/workflows/e2e_tests.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index 438a88172..f7d59fb21 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -687,9 +687,10 @@ jobs: python-version: '3.11.x' extra-dependencies: "redis" - - name: Sync cognee-mcp dependencies - working-directory: cognee-mcp - run: uv sync --no-dev + - name: Install correct dependencies for mcp + run: | + uv run pip install -e ./cognee-mcp + uv run pip install --force-reinstall -e . - name: Run api/tool usage logger env: From 58a4e34b5db0a45cf58b97832994ea376d372812 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:54:08 +0100 Subject: [PATCH 30/41] Update e2e_tests.yml --- .github/workflows/e2e_tests.yml | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index f7d59fb21..4ead0801b 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -687,10 +687,13 @@ jobs: python-version: '3.11.x' extra-dependencies: "redis" - - name: Install correct dependencies for mcp + - name: Install minimal MCP dependencies from cognee-mcp lockfile run: | - uv run pip install -e ./cognee-mcp - uv run pip install --force-reinstall -e . + # Extract exact versions of mcp, fastmcp, httpx from cognee-mcp lockfile and install only those + cd cognee-mcp + MCP_DEPS=$(uv export --no-hashes --format requirements | grep -E '^(mcp|fastmcp|httpx)==') + cd .. + echo "$MCP_DEPS" | xargs uv run pip install - name: Run api/tool usage logger env: From eb3e3984b34ad0aa41da04fdfdf8b44794e0834a Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 18:56:38 +0100 Subject: [PATCH 31/41] Update e2e_tests.yml --- .github/workflows/e2e_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index 4ead0801b..f3ab06a52 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -691,7 +691,7 @@ jobs: run: | # Extract exact versions of mcp, fastmcp, httpx from cognee-mcp lockfile and install only those cd cognee-mcp - MCP_DEPS=$(uv export --no-hashes --format requirements | grep -E '^(mcp|fastmcp|httpx)==') + MCP_DEPS=$(uv export --no-hashes --format requirements.txt | grep -E '^(mcp|fastmcp|httpx)==') cd .. echo "$MCP_DEPS" | xargs uv run pip install From cc9eae0285c92271bc5c24f5960b71f76337d230 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 19:01:41 +0100 Subject: [PATCH 32/41] Update e2e_tests.yml --- .github/workflows/e2e_tests.yml | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index f3ab06a52..290fa77a4 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -689,11 +689,15 @@ jobs: - name: Install minimal MCP dependencies from cognee-mcp lockfile run: | - # Extract exact versions of mcp, fastmcp, httpx from cognee-mcp lockfile and install only those + # Extract exact versions of mcp, fastmcp, httpx from cognee-mcp lockfile cd cognee-mcp MCP_DEPS=$(uv export --no-hashes --format requirements.txt | grep -E '^(mcp|fastmcp|httpx)==') cd .. - echo "$MCP_DEPS" | xargs uv run pip install + # Install into main .venv (where pytest runs) using the same Python + PYTHON=$(uv run which python) + echo "$MCP_DEPS" | "$PYTHON" -m pip install + # Verify installation + "$PYTHON" -c "import mcp; print('mcp version:', mcp.__version__)" - name: Run api/tool usage logger env: From cea732ea290dc8ac4e88f9b658e345dcecbfb67c Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 19:03:52 +0100 Subject: [PATCH 33/41] Update e2e_tests.yml --- .github/workflows/e2e_tests.yml | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index 290fa77a4..e5ac28a3a 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -687,17 +687,10 @@ jobs: python-version: '3.11.x' extra-dependencies: "redis" - - name: Install minimal MCP dependencies from cognee-mcp lockfile + - name: Install MCP dependencies from cognee-mcp pyproject.toml run: | - # Extract exact versions of mcp, fastmcp, httpx from cognee-mcp lockfile - cd cognee-mcp - MCP_DEPS=$(uv export --no-hashes --format requirements.txt | grep -E '^(mcp|fastmcp|httpx)==') - cd .. - # Install into main .venv (where pytest runs) using the same Python - PYTHON=$(uv run which python) - echo "$MCP_DEPS" | "$PYTHON" -m pip install - # Verify installation - "$PYTHON" -c "import mcp; print('mcp version:', mcp.__version__)" + # Extract dependencies directly from cognee-mcp/pyproject.toml and install them + grep -E '"(fastmcp|mcp|httpx)' cognee-mcp/pyproject.toml | sed 's/.*"\(.*\)".*/\1/' | xargs uv run pip install - name: Run api/tool usage logger env: From 97b392e69e6226216f7e451415968c17c174f306 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 19:11:38 +0100 Subject: [PATCH 34/41] Update e2e_tests.yml --- .github/workflows/e2e_tests.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index e5ac28a3a..102de4206 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -687,10 +687,11 @@ jobs: python-version: '3.11.x' extra-dependencies: "redis" - - name: Install MCP dependencies from cognee-mcp pyproject.toml + - name: Install cognee-mcp (local version) + shell: bash run: | - # Extract dependencies directly from cognee-mcp/pyproject.toml and install them - grep -E '"(fastmcp|mcp|httpx)' cognee-mcp/pyproject.toml | sed 's/.*"\(.*\)".*/\1/' | xargs uv run pip install + uv pip install --no-deps -e ./cognee-mcp + uv pip install "fastmcp>=2.10.0,<3.0.0" "mcp>=1.12.0,<2.0.0" "httpx>=0.27.0,<1.0.0" - name: Run api/tool usage logger env: From ffe5393ea77f2a2645061c7f238d1b9ba9e7710b Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 19:17:29 +0100 Subject: [PATCH 35/41] Update e2e_tests.yml --- .github/workflows/e2e_tests.yml | 1 - 1 file changed, 1 deletion(-) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index 102de4206..312fdb137 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -691,7 +691,6 @@ jobs: shell: bash run: | uv pip install --no-deps -e ./cognee-mcp - uv pip install "fastmcp>=2.10.0,<3.0.0" "mcp>=1.12.0,<2.0.0" "httpx>=0.27.0,<1.0.0" - name: Run api/tool usage logger env: From 2aec13ea6e47733fed63569be9ecd1ab9d519271 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Fri, 16 Jan 2026 19:21:01 +0100 Subject: [PATCH 36/41] Update e2e_tests.yml --- .github/workflows/e2e_tests.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/e2e_tests.yml b/.github/workflows/e2e_tests.yml index 312fdb137..f6baf4242 100644 --- a/.github/workflows/e2e_tests.yml +++ b/.github/workflows/e2e_tests.yml @@ -690,7 +690,7 @@ jobs: - name: Install cognee-mcp (local version) shell: bash run: | - uv pip install --no-deps -e ./cognee-mcp + uv pip install -e ./cognee-mcp - name: Run api/tool usage logger env: From 68d102a5e2d5f50b7370ea3c275b5513c789e79f Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Mon, 19 Jan 2026 10:20:45 +0100 Subject: [PATCH 37/41] chore: removing cache clear method call --- cognee/tests/test_usage_logger_e2e.py | 9 --------- 1 file changed, 9 deletions(-) diff --git a/cognee/tests/test_usage_logger_e2e.py b/cognee/tests/test_usage_logger_e2e.py index 897097183..974413f3e 100644 --- a/cognee/tests/test_usage_logger_e2e.py +++ b/cognee/tests/test_usage_logger_e2e.py @@ -27,9 +27,6 @@ async def _reset_engines_and_prune(): except Exception: pass - create_graph_engine.cache_clear() - create_vector_engine.cache_clear() - create_relational_engine.cache_clear() await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) @@ -53,14 +50,9 @@ def e2e_config(): os.environ["CACHE_BACKEND"] = "redis" os.environ["CACHE_HOST"] = "localhost" os.environ["CACHE_PORT"] = "6379" - get_cache_config.cache_clear() - create_cache_engine.cache_clear() yield os.environ.clear() os.environ.update(original_env) - get_cache_config.cache_clear() - create_cache_engine.cache_clear() - @pytest.fixture(scope="session") def authenticated_client(test_client): @@ -116,7 +108,6 @@ def test_client(): @pytest_asyncio.fixture async def cache_engine(e2e_config): """Get cache engine for log verification in test's event loop.""" - create_cache_engine.cache_clear() from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter from cognee.infrastructure.databases.cache.config import get_cache_config From 15a9563562398ce1609ee1e7689b907e3307e44a Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Mon, 19 Jan 2026 10:25:38 +0100 Subject: [PATCH 38/41] ruff --- cognee/tests/test_usage_logger_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cognee/tests/test_usage_logger_e2e.py b/cognee/tests/test_usage_logger_e2e.py index 974413f3e..cfaddee46 100644 --- a/cognee/tests/test_usage_logger_e2e.py +++ b/cognee/tests/test_usage_logger_e2e.py @@ -27,7 +27,6 @@ async def _reset_engines_and_prune(): except Exception: pass - await cognee.prune.prune_data() await cognee.prune.prune_system(metadata=True) @@ -54,6 +53,7 @@ def e2e_config(): os.environ.clear() os.environ.update(original_env) + @pytest.fixture(scope="session") def authenticated_client(test_client): """Override authentication to use default user.""" From a61ef4ad3f641666f5a8d6abb6df96369f15233f Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Mon, 19 Jan 2026 10:28:44 +0100 Subject: [PATCH 39/41] Update test_usage_logger_e2e.py --- cognee/tests/test_usage_logger_e2e.py | 7 ------- 1 file changed, 7 deletions(-) diff --git a/cognee/tests/test_usage_logger_e2e.py b/cognee/tests/test_usage_logger_e2e.py index cfaddee46..db34df2cb 100644 --- a/cognee/tests/test_usage_logger_e2e.py +++ b/cognee/tests/test_usage_logger_e2e.py @@ -7,13 +7,6 @@ from fastapi.testclient import TestClient import cognee from cognee.api.client import app from cognee.modules.users.methods import get_default_user, get_authenticated_user -from cognee.infrastructure.databases.cache.config import get_cache_config -from cognee.infrastructure.databases.cache.get_cache_engine import create_cache_engine -from cognee.infrastructure.databases.graph.get_graph_engine import create_graph_engine -from cognee.infrastructure.databases.vector.create_vector_engine import create_vector_engine -from cognee.infrastructure.databases.relational.create_relational_engine import ( - create_relational_engine, -) async def _reset_engines_and_prune(): From 77543dbd9089cafce7f3c7438a46571ea8cf0254 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Mon, 19 Jan 2026 12:43:27 +0100 Subject: [PATCH 40/41] chore: adds docstrings to usage logger --- cognee/shared/usage_logger.py | 53 ++++++++++++++++++++++++++++++++++- 1 file changed, 52 insertions(+), 1 deletion(-) diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py index 33d2dac10..1c07aff47 100644 --- a/cognee/shared/usage_logger.py +++ b/cognee/shared/usage_logger.py @@ -29,6 +29,7 @@ def _sanitize_value(value: Any) -> Any: @_sanitize_value.register(type(None)) def _(value: None) -> None: + """Handle None values - returns None as-is.""" return None @@ -37,27 +38,32 @@ def _(value: None) -> None: @_sanitize_value.register(float) @_sanitize_value.register(bool) def _(value: str | int | float | bool) -> str | int | float | bool: + """Handle primitive types - returns value as-is since they're JSON-serializable.""" return value @_sanitize_value.register(UUID) def _(value: UUID) -> str: + """Convert UUID to string representation.""" return str(value) @_sanitize_value.register(datetime) def _(value: datetime) -> str: + """Convert datetime to ISO format string.""" return value.isoformat() @_sanitize_value.register(list) @_sanitize_value.register(tuple) def _(value: list | tuple) -> list: + """Recursively sanitize list or tuple elements.""" return [_sanitize_value(v) for v in value] @_sanitize_value.register(dict) def _(value: dict) -> dict: + """Recursively sanitize dictionary keys and values.""" sanitized = {} for k, v in value.items(): key_str = k if isinstance(k, str) else _sanitize_dict_key(k) @@ -151,7 +157,23 @@ async def _log_usage_async( start_time: datetime, end_time: datetime, ): - """Asynchronously log function usage to Redis.""" + """Asynchronously log function usage to Redis. + + Args: + function_name: Name of the function being logged. + log_type: Type of log entry (e.g., "api_endpoint", "mcp_tool", "function"). + user_id: User identifier, or None to use "unknown". + parameters: Dictionary of function parameters (sanitized). + result: Function return value (will be sanitized). + success: Whether the function executed successfully. + error: Error message if function failed, None otherwise. + start_time: Function start timestamp. + end_time: Function end timestamp. + + Note: + This function silently handles errors to avoid disrupting the original + function execution. Logs are written to Redis with TTL from config. + """ try: logger.debug(f"Starting to log usage for {function_name} at {start_time.isoformat()}") config = get_cache_config() @@ -224,6 +246,17 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): """ def decorator(func: Callable) -> Callable: + """Inner decorator that wraps the function with usage logging. + + Args: + func: The async function to wrap with usage logging. + + Returns: + Callable: The wrapped function with usage logging enabled. + + Raises: + UsageLoggerError: If the function is not async. + """ if not inspect.iscoroutinefunction(func): raise UsageLoggerError( f"@log_usage requires an async function. Got {func.__name__} which is not async." @@ -231,6 +264,24 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): @wraps(func) async def async_wrapper(*args, **kwargs): + """Wrapper function that executes the original function and logs usage. + + This wrapper: + - Extracts user ID and parameters from function arguments + - Executes the original function + - Captures result, success status, and any errors + - Logs usage information asynchronously without blocking + + Args: + *args: Positional arguments passed to the original function. + **kwargs: Keyword arguments passed to the original function. + + Returns: + Any: The return value of the original function. + + Raises: + Any exception raised by the original function (re-raised after logging). + """ config = get_cache_config() if not config.usage_logging: return await func(*args, **kwargs) From fb77f71c30688eb6145934ac33393d211bb892f3 Mon Sep 17 00:00:00 2001 From: hajdul88 <52442977+hajdul88@users.noreply.github.com> Date: Mon, 19 Jan 2026 12:43:46 +0100 Subject: [PATCH 41/41] ruff --- cognee/shared/usage_logger.py | 19 ++++++++++--------- 1 file changed, 10 insertions(+), 9 deletions(-) diff --git a/cognee/shared/usage_logger.py b/cognee/shared/usage_logger.py index 1c07aff47..5dde5b9ab 100644 --- a/cognee/shared/usage_logger.py +++ b/cognee/shared/usage_logger.py @@ -158,7 +158,7 @@ async def _log_usage_async( end_time: datetime, ): """Asynchronously log function usage to Redis. - + Args: function_name: Name of the function being logged. log_type: Type of log entry (e.g., "api_endpoint", "mcp_tool", "function"). @@ -167,9 +167,10 @@ async def _log_usage_async( result: Function return value (will be sanitized). success: Whether the function executed successfully. error: Error message if function failed, None otherwise. + duration_ms: Execution duration in milliseconds. start_time: Function start timestamp. end_time: Function end timestamp. - + Note: This function silently handles errors to avoid disrupting the original function execution. Logs are written to Redis with TTL from config. @@ -247,13 +248,13 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): def decorator(func: Callable) -> Callable: """Inner decorator that wraps the function with usage logging. - + Args: func: The async function to wrap with usage logging. - + Returns: Callable: The wrapped function with usage logging enabled. - + Raises: UsageLoggerError: If the function is not async. """ @@ -265,20 +266,20 @@ def log_usage(function_name: Optional[str] = None, log_type: str = "function"): @wraps(func) async def async_wrapper(*args, **kwargs): """Wrapper function that executes the original function and logs usage. - + This wrapper: - Extracts user ID and parameters from function arguments - Executes the original function - Captures result, success status, and any errors - Logs usage information asynchronously without blocking - + Args: *args: Positional arguments passed to the original function. **kwargs: Keyword arguments passed to the original function. - + Returns: Any: The return value of the original function. - + Raises: Any exception raised by the original function (re-raised after logging). """