524 lines
No EOL
20 KiB
Python
524 lines
No EOL
20 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Stress and load testing for Graphiti MCP Server.
|
|
Tests system behavior under high load, resource constraints, and edge conditions.
|
|
"""
|
|
|
|
import asyncio
|
|
import gc
|
|
import json
|
|
import os
|
|
import psutil
|
|
import random
|
|
import time
|
|
from dataclasses import dataclass
|
|
from typing import Dict, List, Optional, Tuple
|
|
|
|
import pytest
|
|
from test_fixtures import TestDataGenerator, graphiti_test_client, PerformanceBenchmark
|
|
|
|
|
|
@dataclass
|
|
class LoadTestConfig:
|
|
"""Configuration for load testing scenarios."""
|
|
|
|
num_clients: int = 10
|
|
operations_per_client: int = 100
|
|
ramp_up_time: float = 5.0 # seconds
|
|
test_duration: float = 60.0 # seconds
|
|
target_throughput: Optional[float] = None # ops/sec
|
|
think_time: float = 0.1 # seconds between ops
|
|
|
|
|
|
@dataclass
|
|
class LoadTestResult:
|
|
"""Results from a load test run."""
|
|
|
|
total_operations: int
|
|
successful_operations: int
|
|
failed_operations: int
|
|
duration: float
|
|
throughput: float
|
|
average_latency: float
|
|
p50_latency: float
|
|
p95_latency: float
|
|
p99_latency: float
|
|
max_latency: float
|
|
errors: Dict[str, int]
|
|
resource_usage: Dict[str, float]
|
|
|
|
|
|
class LoadTester:
|
|
"""Orchestrate load testing scenarios."""
|
|
|
|
def __init__(self, config: LoadTestConfig):
|
|
self.config = config
|
|
self.metrics: List[Tuple[float, float, bool]] = [] # (start, duration, success)
|
|
self.errors: Dict[str, int] = {}
|
|
self.start_time: Optional[float] = None
|
|
|
|
async def run_client_workload(
|
|
self,
|
|
client_id: int,
|
|
session,
|
|
group_id: str
|
|
) -> Dict[str, int]:
|
|
"""Run workload for a single simulated client."""
|
|
stats = {'success': 0, 'failure': 0}
|
|
data_gen = TestDataGenerator()
|
|
|
|
# Ramp-up delay
|
|
ramp_delay = (client_id / self.config.num_clients) * self.config.ramp_up_time
|
|
await asyncio.sleep(ramp_delay)
|
|
|
|
for op_num in range(self.config.operations_per_client):
|
|
operation_start = time.time()
|
|
|
|
try:
|
|
# Randomly select operation type
|
|
operation = random.choice([
|
|
'add_memory',
|
|
'search_memory_nodes',
|
|
'get_episodes',
|
|
])
|
|
|
|
if operation == 'add_memory':
|
|
args = {
|
|
'name': f'Load Test {client_id}-{op_num}',
|
|
'episode_body': data_gen.generate_technical_document(),
|
|
'source': 'text',
|
|
'source_description': 'load test',
|
|
'group_id': group_id,
|
|
}
|
|
elif operation == 'search_memory_nodes':
|
|
args = {
|
|
'query': random.choice(['performance', 'architecture', 'test', 'data']),
|
|
'group_id': group_id,
|
|
'limit': 10,
|
|
}
|
|
else: # get_episodes
|
|
args = {
|
|
'group_id': group_id,
|
|
'last_n': 10,
|
|
}
|
|
|
|
# Execute operation with timeout
|
|
result = await asyncio.wait_for(
|
|
session.call_tool(operation, args),
|
|
timeout=30.0
|
|
)
|
|
|
|
duration = time.time() - operation_start
|
|
self.metrics.append((operation_start, duration, True))
|
|
stats['success'] += 1
|
|
|
|
except asyncio.TimeoutError:
|
|
duration = time.time() - operation_start
|
|
self.metrics.append((operation_start, duration, False))
|
|
self.errors['timeout'] = self.errors.get('timeout', 0) + 1
|
|
stats['failure'] += 1
|
|
|
|
except Exception as e:
|
|
duration = time.time() - operation_start
|
|
self.metrics.append((operation_start, duration, False))
|
|
error_type = type(e).__name__
|
|
self.errors[error_type] = self.errors.get(error_type, 0) + 1
|
|
stats['failure'] += 1
|
|
|
|
# Think time between operations
|
|
await asyncio.sleep(self.config.think_time)
|
|
|
|
# Stop if we've exceeded test duration
|
|
if self.start_time and (time.time() - self.start_time) > self.config.test_duration:
|
|
break
|
|
|
|
return stats
|
|
|
|
def calculate_results(self) -> LoadTestResult:
|
|
"""Calculate load test results from metrics."""
|
|
if not self.metrics:
|
|
return LoadTestResult(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, {}, {})
|
|
|
|
successful = [m for m in self.metrics if m[2]]
|
|
failed = [m for m in self.metrics if not m[2]]
|
|
|
|
latencies = sorted([m[1] for m in self.metrics])
|
|
duration = max([m[0] + m[1] for m in self.metrics]) - min([m[0] for m in self.metrics])
|
|
|
|
# Calculate percentiles
|
|
def percentile(data: List[float], p: float) -> float:
|
|
if not data:
|
|
return 0.0
|
|
idx = int(len(data) * p / 100)
|
|
return data[min(idx, len(data) - 1)]
|
|
|
|
# Get resource usage
|
|
process = psutil.Process()
|
|
resource_usage = {
|
|
'cpu_percent': process.cpu_percent(),
|
|
'memory_mb': process.memory_info().rss / 1024 / 1024,
|
|
'num_threads': process.num_threads(),
|
|
}
|
|
|
|
return LoadTestResult(
|
|
total_operations=len(self.metrics),
|
|
successful_operations=len(successful),
|
|
failed_operations=len(failed),
|
|
duration=duration,
|
|
throughput=len(self.metrics) / duration if duration > 0 else 0,
|
|
average_latency=sum(latencies) / len(latencies) if latencies else 0,
|
|
p50_latency=percentile(latencies, 50),
|
|
p95_latency=percentile(latencies, 95),
|
|
p99_latency=percentile(latencies, 99),
|
|
max_latency=max(latencies) if latencies else 0,
|
|
errors=self.errors,
|
|
resource_usage=resource_usage,
|
|
)
|
|
|
|
|
|
class TestLoadScenarios:
|
|
"""Various load testing scenarios."""
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.slow
|
|
async def test_sustained_load(self):
|
|
"""Test system under sustained moderate load."""
|
|
config = LoadTestConfig(
|
|
num_clients=5,
|
|
operations_per_client=20,
|
|
ramp_up_time=2.0,
|
|
test_duration=30.0,
|
|
think_time=0.5,
|
|
)
|
|
|
|
async with graphiti_test_client() as (session, group_id):
|
|
tester = LoadTester(config)
|
|
tester.start_time = time.time()
|
|
|
|
# Run client workloads
|
|
client_tasks = []
|
|
for client_id in range(config.num_clients):
|
|
task = tester.run_client_workload(client_id, session, group_id)
|
|
client_tasks.append(task)
|
|
|
|
# Execute all clients
|
|
await asyncio.gather(*client_tasks)
|
|
|
|
# Calculate results
|
|
results = tester.calculate_results()
|
|
|
|
# Assertions
|
|
assert results.successful_operations > results.failed_operations
|
|
assert results.average_latency < 5.0, f"Average latency too high: {results.average_latency:.2f}s"
|
|
assert results.p95_latency < 10.0, f"P95 latency too high: {results.p95_latency:.2f}s"
|
|
|
|
# Report results
|
|
print(f"\nSustained Load Test Results:")
|
|
print(f" Total operations: {results.total_operations}")
|
|
print(f" Success rate: {results.successful_operations / results.total_operations * 100:.1f}%")
|
|
print(f" Throughput: {results.throughput:.2f} ops/s")
|
|
print(f" Avg latency: {results.average_latency:.2f}s")
|
|
print(f" P95 latency: {results.p95_latency:.2f}s")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.slow
|
|
async def test_spike_load(self):
|
|
"""Test system response to sudden load spikes."""
|
|
async with graphiti_test_client() as (session, group_id):
|
|
# Normal load phase
|
|
normal_tasks = []
|
|
for i in range(3):
|
|
task = session.call_tool(
|
|
'add_memory',
|
|
{
|
|
'name': f'Normal Load {i}',
|
|
'episode_body': 'Normal operation',
|
|
'source': 'text',
|
|
'source_description': 'normal',
|
|
'group_id': group_id,
|
|
}
|
|
)
|
|
normal_tasks.append(task)
|
|
await asyncio.sleep(0.5)
|
|
|
|
await asyncio.gather(*normal_tasks)
|
|
|
|
# Spike phase - sudden burst of requests
|
|
spike_start = time.time()
|
|
spike_tasks = []
|
|
for i in range(50):
|
|
task = session.call_tool(
|
|
'add_memory',
|
|
{
|
|
'name': f'Spike Load {i}',
|
|
'episode_body': TestDataGenerator.generate_technical_document(),
|
|
'source': 'text',
|
|
'source_description': 'spike',
|
|
'group_id': group_id,
|
|
}
|
|
)
|
|
spike_tasks.append(task)
|
|
|
|
# Execute spike
|
|
spike_results = await asyncio.gather(*spike_tasks, return_exceptions=True)
|
|
spike_duration = time.time() - spike_start
|
|
|
|
# Analyze spike handling
|
|
spike_failures = sum(1 for r in spike_results if isinstance(r, Exception))
|
|
spike_success_rate = (len(spike_results) - spike_failures) / len(spike_results)
|
|
|
|
print(f"\nSpike Load Test Results:")
|
|
print(f" Spike size: {len(spike_tasks)} operations")
|
|
print(f" Duration: {spike_duration:.2f}s")
|
|
print(f" Success rate: {spike_success_rate * 100:.1f}%")
|
|
print(f" Throughput: {len(spike_tasks) / spike_duration:.2f} ops/s")
|
|
|
|
# System should handle at least 80% of spike
|
|
assert spike_success_rate > 0.8, f"Too many failures during spike: {spike_failures}"
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.slow
|
|
async def test_memory_leak_detection(self):
|
|
"""Test for memory leaks during extended operation."""
|
|
async with graphiti_test_client() as (session, group_id):
|
|
process = psutil.Process()
|
|
gc.collect() # Force garbage collection
|
|
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
|
|
|
|
# Perform many operations
|
|
for batch in range(10):
|
|
batch_tasks = []
|
|
for i in range(10):
|
|
task = session.call_tool(
|
|
'add_memory',
|
|
{
|
|
'name': f'Memory Test {batch}-{i}',
|
|
'episode_body': TestDataGenerator.generate_technical_document(),
|
|
'source': 'text',
|
|
'source_description': 'memory test',
|
|
'group_id': group_id,
|
|
}
|
|
)
|
|
batch_tasks.append(task)
|
|
|
|
await asyncio.gather(*batch_tasks)
|
|
|
|
# Force garbage collection between batches
|
|
gc.collect()
|
|
await asyncio.sleep(1)
|
|
|
|
# Check memory after operations
|
|
gc.collect()
|
|
final_memory = process.memory_info().rss / 1024 / 1024 # MB
|
|
memory_growth = final_memory - initial_memory
|
|
|
|
print(f"\nMemory Leak Test:")
|
|
print(f" Initial memory: {initial_memory:.1f} MB")
|
|
print(f" Final memory: {final_memory:.1f} MB")
|
|
print(f" Growth: {memory_growth:.1f} MB")
|
|
|
|
# Allow for some memory growth but flag potential leaks
|
|
# This is a soft check - actual threshold depends on system
|
|
if memory_growth > 100: # More than 100MB growth
|
|
print(f" ⚠️ Potential memory leak detected: {memory_growth:.1f} MB growth")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.slow
|
|
async def test_connection_pool_exhaustion(self):
|
|
"""Test behavior when connection pools are exhausted."""
|
|
async with graphiti_test_client() as (session, group_id):
|
|
# Create many concurrent long-running operations
|
|
long_tasks = []
|
|
for i in range(100): # Many more than typical pool size
|
|
task = session.call_tool(
|
|
'search_memory_nodes',
|
|
{
|
|
'query': f'complex query {i} ' + ' '.join([TestDataGenerator.fake.word() for _ in range(10)]),
|
|
'group_id': group_id,
|
|
'limit': 100,
|
|
}
|
|
)
|
|
long_tasks.append(task)
|
|
|
|
# Execute with timeout
|
|
try:
|
|
results = await asyncio.wait_for(
|
|
asyncio.gather(*long_tasks, return_exceptions=True),
|
|
timeout=60.0
|
|
)
|
|
|
|
# Count connection-related errors
|
|
connection_errors = sum(
|
|
1 for r in results
|
|
if isinstance(r, Exception) and 'connection' in str(r).lower()
|
|
)
|
|
|
|
print(f"\nConnection Pool Test:")
|
|
print(f" Total requests: {len(long_tasks)}")
|
|
print(f" Connection errors: {connection_errors}")
|
|
|
|
except asyncio.TimeoutError:
|
|
print(" Test timed out - possible deadlock or exhaustion")
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.slow
|
|
async def test_gradual_degradation(self):
|
|
"""Test system degradation under increasing load."""
|
|
async with graphiti_test_client() as (session, group_id):
|
|
load_levels = [5, 10, 20, 40, 80] # Increasing concurrent operations
|
|
results_by_level = {}
|
|
|
|
for level in load_levels:
|
|
level_start = time.time()
|
|
tasks = []
|
|
|
|
for i in range(level):
|
|
task = session.call_tool(
|
|
'add_memory',
|
|
{
|
|
'name': f'Load Level {level} Op {i}',
|
|
'episode_body': f'Testing at load level {level}',
|
|
'source': 'text',
|
|
'source_description': 'degradation test',
|
|
'group_id': group_id,
|
|
}
|
|
)
|
|
tasks.append(task)
|
|
|
|
# Execute level
|
|
level_results = await asyncio.gather(*tasks, return_exceptions=True)
|
|
level_duration = time.time() - level_start
|
|
|
|
# Calculate metrics
|
|
failures = sum(1 for r in level_results if isinstance(r, Exception))
|
|
success_rate = (level - failures) / level * 100
|
|
throughput = level / level_duration
|
|
|
|
results_by_level[level] = {
|
|
'success_rate': success_rate,
|
|
'throughput': throughput,
|
|
'duration': level_duration,
|
|
}
|
|
|
|
print(f"\nLoad Level {level}:")
|
|
print(f" Success rate: {success_rate:.1f}%")
|
|
print(f" Throughput: {throughput:.2f} ops/s")
|
|
print(f" Duration: {level_duration:.2f}s")
|
|
|
|
# Brief pause between levels
|
|
await asyncio.sleep(2)
|
|
|
|
# Verify graceful degradation
|
|
# Success rate should not drop below 50% even at high load
|
|
for level, metrics in results_by_level.items():
|
|
assert metrics['success_rate'] > 50, f"Poor performance at load level {level}"
|
|
|
|
|
|
class TestResourceLimits:
|
|
"""Test behavior at resource limits."""
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_large_payload_handling(self):
|
|
"""Test handling of very large payloads."""
|
|
async with graphiti_test_client() as (session, group_id):
|
|
payload_sizes = [
|
|
(1_000, "1KB"),
|
|
(10_000, "10KB"),
|
|
(100_000, "100KB"),
|
|
(1_000_000, "1MB"),
|
|
]
|
|
|
|
for size, label in payload_sizes:
|
|
content = "x" * size
|
|
|
|
start_time = time.time()
|
|
try:
|
|
result = await asyncio.wait_for(
|
|
session.call_tool(
|
|
'add_memory',
|
|
{
|
|
'name': f'Large Payload {label}',
|
|
'episode_body': content,
|
|
'source': 'text',
|
|
'source_description': 'payload test',
|
|
'group_id': group_id,
|
|
}
|
|
),
|
|
timeout=30.0
|
|
)
|
|
duration = time.time() - start_time
|
|
status = "✅ Success"
|
|
|
|
except asyncio.TimeoutError:
|
|
duration = 30.0
|
|
status = "⏱️ Timeout"
|
|
|
|
except Exception as e:
|
|
duration = time.time() - start_time
|
|
status = f"❌ Error: {type(e).__name__}"
|
|
|
|
print(f"Payload {label}: {status} ({duration:.2f}s)")
|
|
|
|
@pytest.mark.asyncio
|
|
async def test_rate_limit_handling(self):
|
|
"""Test handling of rate limits."""
|
|
async with graphiti_test_client() as (session, group_id):
|
|
# Rapid fire requests to trigger rate limits
|
|
rapid_tasks = []
|
|
for i in range(100):
|
|
task = session.call_tool(
|
|
'add_memory',
|
|
{
|
|
'name': f'Rate Limit Test {i}',
|
|
'episode_body': f'Testing rate limit {i}',
|
|
'source': 'text',
|
|
'source_description': 'rate test',
|
|
'group_id': group_id,
|
|
}
|
|
)
|
|
rapid_tasks.append(task)
|
|
|
|
# Execute without delays
|
|
results = await asyncio.gather(*rapid_tasks, return_exceptions=True)
|
|
|
|
# Count rate limit errors
|
|
rate_limit_errors = sum(
|
|
1 for r in results
|
|
if isinstance(r, Exception) and ('rate' in str(r).lower() or '429' in str(r))
|
|
)
|
|
|
|
print(f"\nRate Limit Test:")
|
|
print(f" Total requests: {len(rapid_tasks)}")
|
|
print(f" Rate limit errors: {rate_limit_errors}")
|
|
print(f" Success rate: {(len(rapid_tasks) - rate_limit_errors) / len(rapid_tasks) * 100:.1f}%")
|
|
|
|
|
|
def generate_load_test_report(results: List[LoadTestResult]) -> str:
|
|
"""Generate comprehensive load test report."""
|
|
report = []
|
|
report.append("\n" + "=" * 60)
|
|
report.append("LOAD TEST REPORT")
|
|
report.append("=" * 60)
|
|
|
|
for i, result in enumerate(results):
|
|
report.append(f"\nTest Run {i + 1}:")
|
|
report.append(f" Total Operations: {result.total_operations}")
|
|
report.append(f" Success Rate: {result.successful_operations / result.total_operations * 100:.1f}%")
|
|
report.append(f" Throughput: {result.throughput:.2f} ops/s")
|
|
report.append(f" Latency (avg/p50/p95/p99/max): {result.average_latency:.2f}/{result.p50_latency:.2f}/{result.p95_latency:.2f}/{result.p99_latency:.2f}/{result.max_latency:.2f}s")
|
|
|
|
if result.errors:
|
|
report.append(" Errors:")
|
|
for error_type, count in result.errors.items():
|
|
report.append(f" {error_type}: {count}")
|
|
|
|
report.append(" Resource Usage:")
|
|
for metric, value in result.resource_usage.items():
|
|
report.append(f" {metric}: {value:.2f}")
|
|
|
|
report.append("=" * 60)
|
|
return "\n".join(report)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
pytest.main([__file__, "-v", "--asyncio-mode=auto", "-m", "slow"]) |