feat: implements first version of usage_logger decorator

This commit is contained in:
hajdul88 2026-01-15 14:22:01 +01:00
parent 34513f2c10
commit e803f10417

View file

@ -0,0 +1,311 @@
import asyncio
import inspect
import os
from datetime import datetime, timezone
from functools import wraps
from typing import Any, Callable, Optional
from uuid import UUID
from cognee.infrastructure.databases.cache.config import get_cache_config
from cognee.infrastructure.databases.cache.get_cache_engine import get_cache_engine
from cognee.shared.logging_utils import get_logger
from cognee import __version__ as cognee_version
logger = get_logger("usage_logger")
def _sanitize_value(value: Any) -> Any:
"""Ensure value is JSON serializable - converts non-serializable values to default messages."""
if value is None:
return None
if isinstance(value, (str, int, float, bool)):
return value
if isinstance(value, (UUID,)):
return str(value)
if isinstance(value, datetime):
return value.isoformat()
if isinstance(value, (list, tuple)):
return [_sanitize_value(v) for v in value]
if isinstance(value, dict):
sanitized = {}
for k, v in value.items():
if isinstance(k, str):
key_str = k
else:
sanitized_key = _sanitize_value(k)
if isinstance(sanitized_key, str):
key_str = sanitized_key
else:
key_str = str(sanitized_key) if sanitized_key != "<cannot be serialized>" else f"<key:{type(k).__name__}>"
sanitized[key_str] = _sanitize_value(v)
return sanitized
try:
str_repr = str(value)
if str_repr.startswith("<") and str_repr.endswith(">"):
return f"<cannot be serialized: {type(value).__name__}>"
return str_repr
except Exception:
return f"<cannot be serialized: {type(value).__name__}>"
def _extract_user_id(args: tuple, kwargs: dict, func: Callable) -> Optional[str]:
"""Extract user_id from function arguments if available."""
try:
if "user" in kwargs and kwargs["user"] is not None:
user = kwargs["user"]
if hasattr(user, "id"):
return str(user.id)
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())
for i, param_name in enumerate(param_names):
if i < len(args) and param_name == "user":
user = args[i]
if user is not None and hasattr(user, "id"):
return str(user.id)
return None
except Exception:
return None
def _extract_parameters(args: tuple, kwargs: dict, func: Callable) -> dict:
"""Extract function parameters - captures all parameters, sanitizes for JSON."""
params = {}
for key, value in kwargs.items():
if key != "user":
params[key] = _sanitize_value(value)
try:
sig = inspect.signature(func)
param_names = list(sig.parameters.keys())
for i, param_name in enumerate(param_names):
if i < len(args) and param_name != "user" and param_name not in kwargs:
params[param_name] = _sanitize_value(args[i])
except Exception:
for i, arg_value in enumerate(args):
if i not in params.values():
params[f"arg_{i}"] = _sanitize_value(arg_value)
return params
async def _log_usage_async(
function_name: str,
log_type: str,
user_id: Optional[str],
parameters: dict,
result: Any,
success: bool,
error: Optional[str],
duration_ms: float,
start_time: datetime,
end_time: datetime,
):
"""Asynchronously log function usage to Redis."""
try:
logger.debug(f"Starting to log usage for {function_name} at {start_time.isoformat()}")
config = get_cache_config()
if not config.usage_logging:
logger.debug("Usage logging disabled, skipping log")
return
logger.debug(f"Getting cache engine for {function_name}")
cache_engine = get_cache_engine(lock_key=None, log_key="usage_logging")
if cache_engine is None:
logger.warning(
f"Cache engine not available for usage logging (function: {function_name})"
)
return
logger.debug(f"Cache engine obtained for {function_name}")
if user_id is None:
user_id = "unknown"
logger.debug(f"No user_id provided, using 'unknown' for {function_name}")
log_entry = {
"timestamp": start_time.isoformat(),
"type": log_type,
"function_name": function_name,
"user_id": user_id,
"parameters": parameters,
"result": _sanitize_value(result),
"success": success,
"error": error,
"duration_ms": round(duration_ms, 2),
"start_time": start_time.isoformat(),
"end_time": end_time.isoformat(),
"metadata": {
"cognee_version": cognee_version,
"environment": os.getenv("ENV", "prod"),
},
}
logger.debug(f"Calling log_usage for {function_name}, user_id={user_id}")
await cache_engine.log_usage(
user_id=user_id,
log_entry=log_entry,
ttl=config.usage_logging_ttl,
)
logger.info(f"Successfully logged usage for {function_name} (user_id={user_id})")
except Exception as e:
logger.error(f"Failed to log usage for {function_name}: {str(e)}", exc_info=True)
def log_usage(function_name: Optional[str] = None, log_type: str = "function"):
"""
Decorator to log function usage to Redis.
This decorator is completely transparent - it doesn't change function behavior.
It logs function name, parameters, result, timing, and user (if available).
Args:
function_name: Optional name for the function (defaults to func.__name__)
log_type: Type of log entry (e.g., "api_endpoint", "mcp_tool", "function")
Usage:
@log_usage()
async def my_function(...):
# function code
@log_usage(function_name="POST /v1/add", log_type="api_endpoint")
async def add(...):
# endpoint code
"""
def decorator(func: Callable) -> Callable:
@wraps(func)
async def async_wrapper(*args, **kwargs):
config = get_cache_config()
if not config.usage_logging:
return await func(*args, **kwargs)
# Capture start time
start_time = datetime.now(timezone.utc)
user_id = _extract_user_id(args, kwargs, func)
parameters = _extract_parameters(args, kwargs, func)
result = None
success = True
error = None
try:
result = await func(*args, **kwargs)
return result
except Exception as e:
success = False
error = str(e)
raise
finally:
end_time = datetime.now(timezone.utc)
duration_ms = (end_time - start_time).total_seconds() * 1000
try:
await _log_usage_async(
function_name=function_name or func.__name__,
log_type=log_type,
user_id=user_id,
parameters=parameters,
result=result,
success=success,
error=error,
duration_ms=duration_ms,
start_time=start_time,
end_time=end_time,
)
except Exception as e:
logger.error(
f"Failed to log usage for {function_name or func.__name__}: {str(e)}",
exc_info=True,
)
return async_wrapper
return decorator
if __name__ == "__main__":
# Example 1: Simple function with decorator
@log_usage(function_name="example_function", log_type="example")
async def example_function(param1: str, param2: int, user=None):
"""Example function to demonstrate usage logging."""
await asyncio.sleep(0.1) # Simulate some work
return {(1, 2): "ok"}
# Example 2: Function with user parameter
class MockUser:
def __init__(self, user_id: str):
self.id = user_id
@log_usage(function_name="example_with_user", log_type="example")
async def example_with_user(data: str, user: MockUser, wrong_param=datetime.utcnow().isoformat()):
"""Example function with user parameter."""
await asyncio.sleep(0.05)
return float("nan")
@log_usage(function_name="returns_cycle", log_type="function")
async def returns_cycle():
a = []
a.append(a)
return a
async def run_example():
"""Run example demonstrations."""
print("Usage Logger Example")
print("=" * 50)
# Example 1: Simple function
print("\n1. Running example function:")
result1 = await example_function("example_data", 42)
print(f" Result: {result1}")
await asyncio.sleep(0.2) # Wait for async logging to complete
# Example 2: Function with user
print("\n2. Running example function with user:")
mock_user = MockUser("example-user-123")
result2 = await example_with_user("sample_data", user=mock_user, wrong_param=datetime.utcnow().isoformat())
result3 = await example_with_user("sample_data", user=mock_user)
print(f" Result: {result2}")
await asyncio.sleep(0.2) # Wait for async logging to complete
await returns_cycle()
# Example 3: Retrieve logs (if cache engine is available)
print("\n3. Retrieving usage logs:")
try:
config = get_cache_config()
if config.usage_logging:
cache_engine = get_cache_engine(lock_key="usage_logging")
if cache_engine:
# Get logs for the user
user_id = str(mock_user.id)
logs = await cache_engine.get_usage_logs(user_id, limit=10)
print(f" Found {len(logs)} log entries for user {user_id}")
if logs:
print(
f" Latest log: {logs[0]['function_name']} at {logs[0]['timestamp']}"
)
else:
print(" Cache engine not available")
else:
print(" Usage logging is disabled (set USAGE_LOGGING=true)")
except Exception as e:
print(f" Error retrieving logs: {str(e)}")
print("\n" + "=" * 50)
print("Example completed!")
print("\nNote: Make sure to set these environment variables:")
print(" - USAGE_LOGGING=true")
print(" - CACHING=true (or ensure cache backend is configured)")
print(" - CACHE_BACKEND=redis (or fs)")
print(" - CACHE_HOST=localhost")
print(" - CACHE_PORT=6379")
# Run the example
asyncio.run(run_example())