chore: makes integration test a bit cleaner

This commit is contained in:
hajdul88 2026-01-16 16:18:18 +01:00
parent 3aadb91a6f
commit 4e1e3dcfb9

View file

@ -1,9 +1,13 @@
"""Integration tests for usage logger with real Redis components."""
import os import os
import pytest import pytest
import asyncio import asyncio
from datetime import datetime, timezone from datetime import datetime, timezone
from types import SimpleNamespace from types import SimpleNamespace
from uuid import UUID from uuid import UUID
from unittest.mock import patch
from cognee.shared.usage_logger import log_usage from cognee.shared.usage_logger import log_usage
from cognee.infrastructure.databases.cache.config import get_cache_config from cognee.infrastructure.databases.cache.config import get_cache_config
from cognee.infrastructure.databases.cache.get_cache_engine import ( from cognee.infrastructure.databases.cache.get_cache_engine import (
@ -50,8 +54,7 @@ def redis_adapter():
from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter from cognee.infrastructure.databases.cache.redis.RedisAdapter import RedisAdapter
try: try:
adapter = RedisAdapter(host="localhost", port=6379, log_key="test_usage_logs") yield RedisAdapter(host="localhost", port=6379, log_key="test_usage_logs")
yield adapter
except Exception as e: except Exception as e:
pytest.skip(f"Redis not available: {e}") pytest.skip(f"Redis not available: {e}")
@ -66,8 +69,11 @@ class TestDecoratorBehavior:
"""Test decorator behavior with real components.""" """Test decorator behavior with real components."""
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_decorator_skips_when_disabled(self, usage_logging_disabled): async def test_decorator_configuration(
"""Test decorator skips logging when usage_logging=False.""" self, usage_logging_disabled, usage_logging_config, redis_adapter
):
"""Test decorator skips when disabled and logs when enabled."""
# Test disabled
call_count = 0 call_count = 0
@log_usage(function_name="test_func", log_type="test") @log_usage(function_name="test_func", log_type="test")
@ -79,10 +85,14 @@ class TestDecoratorBehavior:
assert await test_func() == "result" assert await test_func() == "result"
assert call_count == 1 assert call_count == 1
# Test enabled with cache engine None
with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get:
mock_get.return_value = None
assert await test_func() == "result"
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_decorator_basic_logging(self, usage_logging_config, redis_adapter, test_user): async def test_decorator_logging(self, usage_logging_config, redis_adapter, test_user):
"""Test decorator logs to Redis and handles various scenarios.""" """Test decorator logs to Redis with correct structure."""
from unittest.mock import patch
@log_usage(function_name="test_func", log_type="test") @log_usage(function_name="test_func", log_type="test")
async def test_func(param1: str, param2: int = 42, user=None): async def test_func(param1: str, param2: int = 42, user=None):
@ -92,12 +102,10 @@ class TestDecoratorBehavior:
with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get:
mock_get.return_value = redis_adapter mock_get.return_value = redis_adapter
# Test basic logging
result = await test_func("value1", user=test_user) result = await test_func("value1", user=test_user)
assert result == {"result": "value1_42"} assert result == {"result": "value1_42"}
logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10)
assert len(logs) > 0
log = logs[0] log = logs[0]
assert log["function_name"] == "test_func" assert log["function_name"] == "test_func"
assert log["type"] == "test" assert log["type"] == "test"
@ -105,88 +113,23 @@ class TestDecoratorBehavior:
assert log["parameters"]["param1"] == "value1" assert log["parameters"]["param1"] == "value1"
assert log["parameters"]["param2"] == 42 assert log["parameters"]["param2"] == 42
assert log["success"] is True assert log["success"] is True
assert all(
# Test log entry structure field in log
required_fields = [ for field in [
"timestamp", "timestamp",
"type", "result",
"function_name", "error",
"user_id", "duration_ms",
"parameters", "start_time",
"result", "end_time",
"success", "metadata",
"error", ]
"duration_ms", )
"start_time",
"end_time",
"metadata",
]
for field in required_fields:
assert field in log
assert "cognee_version" in log["metadata"] assert "cognee_version" in log["metadata"]
assert "environment" in log["metadata"]
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_decorator_handles_cache_engine_none(self, usage_logging_config): async def test_multiple_calls(self, usage_logging_config, redis_adapter, test_user):
"""Test decorator handles gracefully when cache engine is None.""" """Test multiple consecutive calls are all logged."""
from unittest.mock import patch
@log_usage(function_name="test_func", log_type="test")
async def test_func():
return "result"
with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get:
mock_get.return_value = None
assert await test_func() == "result"
@pytest.mark.asyncio
async def test_success_and_failure_logging(
self, usage_logging_config, redis_adapter, test_user
):
"""Test successful and failed execution logging."""
from unittest.mock import patch
@log_usage(function_name="success_test", log_type="test")
async def success_func(data: str, user=None):
await asyncio.sleep(0.01)
return {"status": "success", "data": data}
@log_usage(function_name="fail_test", log_type="test")
async def fail_func(user=None):
raise ValueError("Test error")
with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get:
mock_get.return_value = redis_adapter
# Test success
result = await success_func("test_data", user=test_user)
assert result == {"status": "success", "data": "test_data"}
logs = await redis_adapter.get_usage_logs("test-user-123", limit=2)
success_log = logs[0]
assert success_log["success"] is True
assert success_log["error"] is None
assert success_log["result"]["status"] == "success"
assert success_log["duration_ms"] > 0
# Test failure
with pytest.raises(ValueError, match="Test error"):
await fail_func(user=test_user)
logs = await redis_adapter.get_usage_logs("test-user-123", limit=2)
fail_log = logs[0]
assert fail_log["success"] is False
assert fail_log["error"] == "Test error"
@pytest.mark.asyncio
async def test_timing_and_multiple_calls(self, usage_logging_config, redis_adapter, test_user):
"""Test timing accuracy and multiple consecutive calls."""
from unittest.mock import patch
@log_usage(function_name="timing_test", log_type="test")
async def timing_func(user=None):
await asyncio.sleep(0.1)
return "done"
@log_usage(function_name="multi_test", log_type="test") @log_usage(function_name="multi_test", log_type="test")
async def multi_func(call_num: int, user=None): async def multi_func(call_num: int, user=None):
@ -195,30 +138,23 @@ class TestDecoratorBehavior:
with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get:
mock_get.return_value = redis_adapter mock_get.return_value = redis_adapter
# Test timing
await timing_func(user=test_user)
logs = await redis_adapter.get_usage_logs("test-user-123", limit=1)
assert 50 <= logs[0]["duration_ms"] <= 200
# Test multiple calls
for i in range(3): for i in range(3):
await multi_func(i, user=test_user) await multi_func(i, user=test_user)
logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10)
assert len(logs) >= 3 assert len(logs) >= 3
call_nums = [log["parameters"]["call_num"] for log in logs[:3]] call_nums = {log["parameters"]["call_num"] for log in logs[:3]}
assert set(call_nums) == {0, 1, 2} assert call_nums == {0, 1, 2}
class TestRealRedisIntegration: class TestRealRedisIntegration:
"""Test real Redis integration.""" """Test real Redis integration."""
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_redis_storage_and_retrieval( async def test_redis_storage_retrieval_and_ttl(
self, usage_logging_config, redis_adapter, test_user self, usage_logging_config, redis_adapter, test_user
): ):
"""Test logs are stored in Redis and can be retrieved with correct order and limits.""" """Test logs are stored, retrieved with correct order/limits, and TTL is set."""
from unittest.mock import patch
@log_usage(function_name="redis_test", log_type="test") @log_usage(function_name="redis_test", log_type="test")
async def redis_func(data: str, user=None): async def redis_func(data: str, user=None):
@ -231,45 +167,26 @@ class TestRealRedisIntegration:
with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get:
mock_get.return_value = redis_adapter mock_get.return_value = redis_adapter
# Test storage # Storage
await redis_func("test_data", user=test_user) await redis_func("test_data", user=test_user)
logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10)
assert len(logs) > 0
assert logs[0]["function_name"] == "redis_test" assert logs[0]["function_name"] == "redis_test"
assert logs[0]["parameters"]["data"] == "test_data" assert logs[0]["parameters"]["data"] == "test_data"
# Test order (most recent first) # Order (most recent first)
for i in range(3): for i in range(3):
await order_func(i, user=test_user) await order_func(i, user=test_user)
await asyncio.sleep(0.01) await asyncio.sleep(0.01)
logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10)
assert logs[0]["parameters"]["num"] == 2 assert [log["parameters"]["num"] for log in logs[:3]] == [2, 1, 0]
assert logs[1]["parameters"]["num"] == 1
assert logs[2]["parameters"]["num"] == 0
# Test limit parameter # Limit
logs = await redis_adapter.get_usage_logs("test-user-123", limit=2) assert len(await redis_adapter.get_usage_logs("test-user-123", limit=2)) == 2
assert len(logs) == 2
@pytest.mark.asyncio # TTL
async def test_ttl_set_correctly(self, usage_logging_config, redis_adapter, test_user): ttl = await redis_adapter.async_redis.ttl("test_usage_logs:test-user-123")
"""Test that TTL is set correctly on Redis keys.""" assert 0 < ttl <= 604800
from unittest.mock import patch
@log_usage(function_name="ttl_test", log_type="test")
async def ttl_func(user=None):
return "result"
with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get:
mock_get.return_value = redis_adapter
await ttl_func(user=test_user)
key = f"test_usage_logs:test-user-123"
ttl = await redis_adapter.async_redis.ttl(key)
assert ttl > 0
assert ttl <= 604800
class TestEdgeCases: class TestEdgeCases:
@ -277,8 +194,7 @@ class TestEdgeCases:
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_edge_cases(self, usage_logging_config, redis_adapter, test_user): async def test_edge_cases(self, usage_logging_config, redis_adapter, test_user):
"""Test various edge cases: no params, defaults, complex structures, exceptions, None, circular refs.""" """Test no params, defaults, complex structures, exceptions, None, circular refs."""
from unittest.mock import patch
@log_usage(function_name="no_params", log_type="test") @log_usage(function_name="no_params", log_type="test")
async def no_params_func(user=None): async def no_params_func(user=None):
@ -306,12 +222,6 @@ class TestEdgeCases:
async def none_func(user=None): async def none_func(user=None):
return None return None
@log_usage(function_name="circular_test", log_type="test")
async def circular_func(user=None):
a = []
a.append(a)
return a
with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get: with patch("cognee.shared.usage_logger.get_cache_engine") as mock_get:
mock_get.return_value = redis_adapter mock_get.return_value = redis_adapter
@ -327,9 +237,8 @@ class TestEdgeCases:
assert logs[0]["parameters"]["param2"] == 42 assert logs[0]["parameters"]["param2"] == 42
# Complex nested structures # Complex nested structures
result = await complex_func(user=test_user) await complex_func(user=test_user)
logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10)
assert "nested" in logs[0]["result"]
assert isinstance(logs[0]["result"]["nested"]["uuid"], str) assert isinstance(logs[0]["result"]["nested"]["uuid"], str)
assert isinstance(logs[0]["result"]["nested"]["datetime"], str) assert isinstance(logs[0]["result"]["nested"]["datetime"], str)
@ -341,13 +250,6 @@ class TestEdgeCases:
assert "Test exception" in logs[0]["error"] assert "Test exception" in logs[0]["error"]
# None return value # None return value
result = await none_func(user=test_user) assert await none_func(user=test_user) is None
assert result is None
logs = await redis_adapter.get_usage_logs("test-user-123", limit=10) logs = await redis_adapter.get_usage_logs("test-user-123", limit=10)
assert logs[0]["result"] is None assert logs[0]["result"] is None
# Circular reference
result = await circular_func(user=test_user)
assert isinstance(result, list)
logs = await redis_adapter.get_usage_logs("test-user-123", limit=10)
assert "result" in logs[0]