This commit is contained in:
palanisd 2025-12-12 10:42:23 +08:00 committed by GitHub
commit b4008eb461
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 1787 additions and 0 deletions

234
.github/INTEGRATION_TEST_SETUP.md vendored Normal file
View file

@ -0,0 +1,234 @@
# GitHub Copilot Setup Steps for LightRAG Integration Testing
This document describes the steps needed to set up and run the LightRAG integration tests locally or in CI/CD.
## Prerequisites
- Python 3.10 or higher
- Docker and Docker Compose
- Git
## Local Setup Steps
### 1. Clone the Repository
```bash
git clone https://github.com/netbrah/LightRAG.git
cd LightRAG
```
### 2. Set Up Python Virtual Environment
```bash
python -m venv .venv
source .venv/bin/activate # On Windows: .venv\Scripts\activate
```
### 3. Install Python Dependencies
```bash
pip install --upgrade pip
pip install -e ".[api]"
pip install pytest pytest-asyncio httpx
```
### 4. Start Docker Services
The integration tests require three services:
- **Redis**: For KV and document status storage
- **Neo4j**: For graph storage
- **Milvus**: For vector storage
```bash
cd tests
docker-compose -f docker-compose.integration.yml up -d
```
### 5. Wait for Services to Be Ready
```bash
# Wait for Redis
until docker exec lightrag-test-redis redis-cli ping | grep -q PONG; do sleep 2; done
# Wait for Neo4j (may take up to 2 minutes)
until docker exec lightrag-test-neo4j cypher-shell -u neo4j -p testpassword123 "RETURN 1" 2>/dev/null | grep -q "1"; do sleep 5; done
# Wait for Milvus (may take up to 3 minutes)
until curl -s http://localhost:9091/healthz | grep -q "OK"; do sleep 5; done
```
### 6. Start Mock OpenAI Server
The mock server simulates OpenAI API responses for testing without requiring actual API keys.
```bash
cd tests
python mock_openai_server.py --host 127.0.0.1 --port 8000 &
MOCK_PID=$!
# Wait for it to be ready
until curl -s http://127.0.0.1:8000/health | grep -q "healthy"; do sleep 1; done
```
### 7. Prepare Test Environment
```bash
cd tests
cp .env.integration .env
mkdir -p test_inputs test_rag_storage
```
### 8. Start LightRAG Server
```bash
cd tests
lightrag-server &
LIGHTRAG_PID=$!
# Wait for it to be ready
until curl -s http://localhost:9621/health | grep -q "status"; do sleep 2; done
```
### 9. Run Integration Tests
```bash
cd tests
python integration_test.py
```
### 10. Cleanup
```bash
# Stop servers
kill $LIGHTRAG_PID
kill $MOCK_PID
# Stop Docker services
docker-compose -f docker-compose.integration.yml down -v
# Remove test artifacts
rm -rf test_inputs test_rag_storage .env
```
## Service Configuration Details
### Redis Configuration
- **Port**: 6379
- **Container**: lightrag-test-redis
- **Purpose**: KV storage and document status tracking
### Neo4j Configuration
- **HTTP Port**: 7474
- **Bolt Port**: 7687
- **Container**: lightrag-test-neo4j
- **Credentials**: neo4j/testpassword123
- **Purpose**: Graph knowledge base storage
### Milvus Configuration
- **API Port**: 19530
- **Health Port**: 9091
- **Container**: lightrag-test-milvus
- **Database**: lightrag_test
- **Purpose**: Vector embeddings storage
### Mock OpenAI Server Configuration
- **Port**: 8000
- **Endpoints**:
- `/v1/chat/completions` - Mock LLM responses
- `/v1/embeddings` - Mock embedding generation
- `/health` - Health check
### LightRAG Server Configuration
- **Port**: 9621
- **Configuration**: tests/.env.integration
- **Storage Backends**:
- KV: RedisKVStorage
- Doc Status: RedisDocStatusStorage
- Vector: MilvusVectorDBStorage
- Graph: Neo4JStorage
## CI/CD Integration
The integration tests are automatically run on every commit via GitHub Actions. See `.github/workflows/integration-test.yml` for the workflow configuration.
### Workflow Triggers
- Push to branches: main, dev, copilot/**
- Pull requests to: main, dev
- Manual workflow dispatch
### Workflow Steps
1. Checkout code
2. Set up Python environment
3. Install dependencies
4. Start Docker services (Redis, Neo4j, Milvus)
5. Wait for all services to be healthy
6. Start Mock OpenAI server
7. Configure test environment
8. Start LightRAG server
9. Run integration tests
10. Collect logs on failure
11. Cleanup all resources
## Test Coverage
The integration tests validate:
1. **Health Check**: Server availability and basic functionality
2. **Document Indexing**:
- File upload (C++ source files)
- Text insertion
- Multiple file formats
3. **Query Operations**:
- Naive mode
- Local mode
- Global mode
- Hybrid mode
4. **Structured Data Retrieval**:
- Entity extraction
- Relationship mapping
- Chunk retrieval
5. **Graph Operations**:
- Graph data retrieval
- Node and edge counting
## Sample Test Repository
The tests use a sample C++ repository located at `tests/sample_cpp_repo/`:
- **Files**: calculator.h, calculator.cpp, utils.h, utils.cpp, main.cpp
- **Purpose**: Demonstrates code indexing and querying capabilities
- **Content**: Simple calculator implementation with documentation
## Troubleshooting
### Services Not Starting
- Check Docker is running: `docker ps`
- Check port availability: `lsof -i :6379,7687,19530,8000,9621`
- Review Docker logs: `docker-compose -f tests/docker-compose.integration.yml logs`
### Mock Server Issues
- Verify port 8000 is available
- Check mock server logs
- Test health endpoint: `curl http://127.0.0.1:8000/health`
### LightRAG Server Issues
- Check environment file: `tests/.env`
- Review server logs: `cat tests/lightrag.log*`
- Verify storage connections
### Test Failures
- Ensure all services are healthy before running tests
- Check network connectivity between services
- Review test output for specific error messages
## Environment Variables
Key environment variables used in integration tests:
- `LIGHTRAG_API_URL`: LightRAG server URL (default: http://localhost:9621)
- `LLM_BINDING_HOST`: Mock OpenAI server URL (default: http://127.0.0.1:8000)
- `EMBEDDING_BINDING_HOST`: Mock embedding server URL (default: http://127.0.0.1:8000)
- `REDIS_URI`: Redis connection string
- `NEO4J_URI`: Neo4j connection string
- `MILVUS_URI`: Milvus connection string
All configurations are defined in `tests/.env.integration`.

164
.github/workflows/integration-test.yml vendored Normal file
View file

@ -0,0 +1,164 @@
name: Integration Tests
on:
push:
pull_request:
workflow_dispatch:
jobs:
integration-test:
name: Full Integration Test
runs-on: ubuntu-latest
timeout-minutes: 30
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Cache pip packages
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-integration-${{ hashFiles('**/pyproject.toml') }}
restore-keys: |
${{ runner.os }}-pip-integration-
${{ runner.os }}-pip-
- name: Install Python dependencies
run: |
python -m pip install --upgrade pip
pip install -e .[api,offline-storage]
pip install pytest pytest-asyncio httpx
- name: Create minimal frontend stub for testing
run: |
mkdir -p lightrag/api/webui
echo '<!DOCTYPE html><html><head><title>LightRAG Test</title></head><body><h1>Integration Test Mode</h1></body></html>' > lightrag/api/webui/index.html
echo "Created minimal frontend stub for integration testing"
- name: Start Docker services (Redis, Neo4j, Milvus)
run: |
cd tests
docker compose -f docker-compose.integration.yml up -d
echo "Waiting for services to be ready..."
- name: Wait for Redis
run: |
echo "Waiting for Redis to be ready..."
timeout 60 bash -c 'until docker exec lightrag-test-redis redis-cli ping | grep -q PONG; do sleep 2; done'
echo "✅ Redis is ready"
- name: Wait for Neo4j
run: |
echo "Waiting for Neo4j to be ready..."
timeout 120 bash -c 'until docker exec lightrag-test-neo4j cypher-shell -u neo4j -p testpassword123 "RETURN 1" 2>/dev/null | grep -q "1"; do sleep 5; done'
echo "✅ Neo4j is ready"
- name: Wait for Milvus
run: |
echo "Waiting for Milvus to be ready..."
timeout 180 bash -c 'until curl -s http://localhost:9091/healthz | grep -q "OK"; do sleep 5; done'
echo "✅ Milvus is ready"
- name: Verify services are running
run: |
docker ps
echo "Testing service connectivity..."
docker exec lightrag-test-redis redis-cli ping
docker exec lightrag-test-neo4j cypher-shell -u neo4j -p testpassword123 "RETURN 1"
curl -s http://localhost:9091/healthz
- name: Start Mock OpenAI Server
run: |
echo "Starting Mock OpenAI Server..."
cd tests
python mock_openai_server.py --host 127.0.0.1 --port 8000 &
MOCK_PID=$!
echo "MOCK_SERVER_PID=${MOCK_PID}" >> $GITHUB_ENV
# Wait for mock server to be ready
echo "Waiting for mock server to be ready..."
timeout 30 bash -c 'until curl -s http://127.0.0.1:8000/health | grep -q "healthy"; do sleep 1; done'
echo "✅ Mock OpenAI Server is ready (PID: ${MOCK_PID})"
- name: Prepare test environment
run: |
cd tests
cp .env.integration .env
mkdir -p test_inputs test_rag_storage
echo "Environment prepared for testing"
- name: Start LightRAG Server
run: |
cd tests
echo "Starting LightRAG Server..."
lightrag-server &
LIGHTRAG_PID=$!
echo "LIGHTRAG_SERVER_PID=${LIGHTRAG_PID}" >> $GITHUB_ENV
# Wait for LightRAG server to be ready
echo "Waiting for LightRAG server to be ready..."
timeout 60 bash -c 'until curl -s http://localhost:9621/health | grep -q "status"; do sleep 2; done'
echo "✅ LightRAG Server is ready (PID: ${LIGHTRAG_PID})"
- name: Run Integration Tests
run: |
cd tests
python integration_test.py
env:
LIGHTRAG_API_URL: http://localhost:9621
- name: Collect logs on failure
if: failure()
run: |
echo "=== LightRAG Server Logs ==="
cat tests/lightrag.log* 2>/dev/null || echo "No LightRAG logs found"
echo "=== Docker Service Logs ==="
docker compose -f tests/docker-compose.integration.yml logs
- name: Stop LightRAG Server
if: always()
run: |
if [ ! -z "$LIGHTRAG_SERVER_PID" ]; then
echo "Stopping LightRAG Server (PID: $LIGHTRAG_SERVER_PID)..."
kill $LIGHTRAG_SERVER_PID 2>/dev/null || true
sleep 2
fi
- name: Stop Mock OpenAI Server
if: always()
run: |
if [ ! -z "$MOCK_SERVER_PID" ]; then
echo "Stopping Mock OpenAI Server (PID: $MOCK_SERVER_PID)..."
kill $MOCK_SERVER_PID 2>/dev/null || true
fi
- name: Stop Docker services
if: always()
run: |
cd tests
docker compose -f docker-compose.integration.yml down -v
echo "Docker services stopped and volumes removed"
- name: Cleanup test artifacts
if: always()
run: |
cd tests
rm -rf test_inputs test_rag_storage .env
echo "Test artifacts cleaned up"
- name: Upload test artifacts
if: always()
uses: actions/upload-artifact@v4
with:
name: integration-test-artifacts
path: |
tests/lightrag.log*
tests/test_rag_storage/
retention-days: 7

View file

@ -1041,6 +1041,24 @@ def create_app(args):
name=args.simulated_model_name, tag=args.simulated_model_tag
)
# Check if we should use an offline-compatible tokenizer (for integration testing)
custom_tokenizer = None
if os.getenv("LIGHTRAG_OFFLINE_TOKENIZER", "false").lower() == "true":
logger.info("Using offline-compatible simple tokenizer for integration testing")
try:
# Import simple tokenizer for offline use
import sys
tests_dir = Path(__file__).parent.parent.parent / "tests"
if tests_dir.exists():
sys.path.insert(0, str(tests_dir))
from simple_tokenizer import create_simple_tokenizer
custom_tokenizer = create_simple_tokenizer()
logger.info("Successfully loaded offline tokenizer")
except Exception as e:
logger.warning(f"Failed to load offline tokenizer, using default: {e}")
# Initialize RAG with unified configuration
try:
rag = LightRAG(
@ -1076,6 +1094,7 @@ def create_app(args):
"entity_types": args.entity_types,
},
ollama_server_infos=ollama_server_infos,
tokenizer=custom_tokenizer, # Pass custom tokenizer if available
)
except Exception as e:
logger.error(f"Failed to initialize LightRAG: {e}")

120
tests/.env.integration Normal file
View file

@ -0,0 +1,120 @@
# Integration Test Environment Configuration
# This file is used for integration testing with mock OpenAI server
###########################
### Server Configuration
###########################
HOST=0.0.0.0
PORT=9621
WEBUI_TITLE='Integration Test KB'
WEBUI_DESCRIPTION="Integration Test for LightRAG"
WORKERS=1
### Directory Configuration
INPUT_DIR=./test_inputs
WORKING_DIR=./test_rag_storage
### Use offline tokenizer (no internet required)
LIGHTRAG_OFFLINE_TOKENIZER=true
### Logging level
LOG_LEVEL=INFO
VERBOSE=False
#####################################
### Authentication (Disabled for tests)
#####################################
# No authentication required for testing
######################################################################################
### Query Configuration
######################################################################################
ENABLE_LLM_CACHE=true
TOP_K=20
CHUNK_TOP_K=10
MAX_ENTITY_TOKENS=4000
MAX_RELATION_TOKENS=4000
MAX_TOTAL_TOKENS=16000
########################################
### Document processing configuration
########################################
ENABLE_LLM_CACHE_FOR_EXTRACT=true
SUMMARY_LANGUAGE=English
### Entity types for code analysis
ENTITY_TYPES='["Class","Function","Variable","Module","Namespace","Struct","Enum","Method"]'
### Chunk size for document splitting
CHUNK_SIZE=800
CHUNK_OVERLAP_SIZE=100
###############################
### Concurrency Configuration
###############################
MAX_ASYNC=2
MAX_PARALLEL_INSERT=1
EMBEDDING_FUNC_MAX_ASYNC=4
EMBEDDING_BATCH_NUM=5
###########################################################################
### LLM Configuration (Mock OpenAI Server)
###########################################################################
LLM_BINDING=openai
LLM_MODEL=gpt-5
LLM_BINDING_HOST=http://127.0.0.1:8000
LLM_BINDING_API_KEY=mock-api-key-for-testing
LLM_TIMEOUT=60
### OpenAI Specific Parameters (for mock server)
OPENAI_LLM_REASONING_EFFORT=medium
OPENAI_LLM_MAX_COMPLETION_TOKENS=8000
OPENAI_LLM_TEMPERATURE=0.7
#######################################################################################
### Embedding Configuration (Mock OpenAI Server)
#######################################################################################
EMBEDDING_BINDING=openai
EMBEDDING_MODEL=text-embedding-3-large
EMBEDDING_DIM=3072
EMBEDDING_BINDING_HOST=http://127.0.0.1:8000
EMBEDDING_BINDING_API_KEY=mock-api-key-for-testing
EMBEDDING_TIMEOUT=30
EMBEDDING_SEND_DIM=false
####################################################################
### WORKSPACE
####################################################################
WORKSPACE=integration_test
############################
### Data storage selection
############################
### Redis Storage
LIGHTRAG_KV_STORAGE=RedisKVStorage
LIGHTRAG_DOC_STATUS_STORAGE=RedisDocStatusStorage
### Milvus Vector Storage
LIGHTRAG_VECTOR_STORAGE=MilvusVectorDBStorage
### Neo4j Graph Storage
LIGHTRAG_GRAPH_STORAGE=Neo4JStorage
### Redis Configuration
REDIS_URI=redis://localhost:6379
REDIS_SOCKET_TIMEOUT=30
REDIS_CONNECT_TIMEOUT=10
REDIS_MAX_CONNECTIONS=50
REDIS_RETRY_ATTEMPTS=3
### Neo4j Configuration
NEO4J_URI=neo4j://localhost:7687
NEO4J_USERNAME=neo4j
NEO4J_PASSWORD=testpassword123
NEO4J_DATABASE=neo4j
NEO4J_MAX_CONNECTION_POOL_SIZE=50
NEO4J_CONNECTION_TIMEOUT=30
### Milvus Configuration
MILVUS_URI=http://localhost:19530
MILVUS_DB_NAME=default

View file

@ -0,0 +1,102 @@
version: '3.8'
services:
# Redis for KV and Doc Status storage
redis:
image: redis:7-alpine
container_name: lightrag-test-redis
ports:
- "6379:6379"
command: redis-server --appendonly yes
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 5s
timeout: 3s
retries: 5
# Neo4j for Graph storage
neo4j:
image: neo4j:5.17.0
container_name: lightrag-test-neo4j
ports:
- "7474:7474" # HTTP
- "7687:7687" # Bolt
environment:
- NEO4J_AUTH=neo4j/testpassword123
- NEO4J_PLUGINS=["apoc"]
- NEO4J_dbms_security_procedures_unrestricted=apoc.*
- NEO4J_dbms_memory_heap_initial__size=512m
- NEO4J_dbms_memory_heap_max__size=1G
healthcheck:
test: ["CMD-SHELL", "cypher-shell -u neo4j -p testpassword123 'RETURN 1'"]
interval: 10s
timeout: 10s
retries: 10
start_period: 40s
# Milvus etcd
etcd:
container_name: lightrag-test-milvus-etcd
image: quay.io/coreos/etcd:v3.5.5
environment:
- ETCD_AUTO_COMPACTION_MODE=revision
- ETCD_AUTO_COMPACTION_RETENTION=1000
- ETCD_QUOTA_BACKEND_BYTES=4294967296
- ETCD_SNAPSHOT_COUNT=50000
volumes:
- etcd-data:/etcd
command: etcd -advertise-client-urls=http://127.0.0.1:2379 -listen-client-urls http://0.0.0.0:2379 --data-dir /etcd
healthcheck:
test: ["CMD", "etcdctl", "endpoint", "health"]
interval: 30s
timeout: 20s
retries: 3
# Milvus MinIO
minio:
container_name: lightrag-test-milvus-minio
image: minio/minio:RELEASE.2023-03-20T20-16-18Z
environment:
MINIO_ROOT_USER: minioadmin
MINIO_ROOT_PASSWORD: minioadmin
ports:
- "9001:9001"
- "9000:9000"
volumes:
- minio-data:/minio_data
command: minio server /minio_data --console-address ":9001"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9000/minio/health/live"]
interval: 30s
timeout: 20s
retries: 3
# Milvus Standalone
milvus:
container_name: lightrag-test-milvus
image: milvusdb/milvus:v2.4.0
command: ["milvus", "run", "standalone"]
security_opt:
- seccomp:unconfined
environment:
ETCD_ENDPOINTS: etcd:2379
MINIO_ADDRESS: minio:9000
volumes:
- milvus-data:/var/lib/milvus
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:9091/healthz"]
interval: 30s
start_period: 90s
timeout: 20s
retries: 3
ports:
- "19530:19530"
- "9091:9091"
depends_on:
- etcd
- minio
volumes:
etcd-data:
minio-data:
milvus-data:

366
tests/integration_test.py Normal file
View file

@ -0,0 +1,366 @@
#!/usr/bin/env python3
"""
Integration test script for LightRAG with production setup.
This script tests:
- Document indexing with C++ code repository
- Query operations (naive, local, global, hybrid)
- API endpoints (insert, query, graph retrieval)
- Integration with Redis, Neo4j, and Milvus storage backends
"""
import asyncio
import json
import os
import sys
import logging
from pathlib import Path
import httpx
# Configure logging
logging.basicConfig(
level=logging.INFO, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s"
)
logger = logging.getLogger(__name__)
class IntegrationTestRunner:
"""Integration test runner for LightRAG."""
def __init__(self, base_url: str = "http://localhost:9621"):
self.base_url = base_url
self.client = httpx.AsyncClient(timeout=120.0)
self.test_results = []
async def __aenter__(self):
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.aclose()
def log_result(self, test_name: str, passed: bool, message: str = ""):
"""Log test result."""
status = "✅ PASS" if passed else "❌ FAIL"
logger.info(f"{status} - {test_name}: {message}")
self.test_results.append(
{"test": test_name, "passed": passed, "message": message}
)
async def wait_for_server(self, max_retries: int = 30, retry_delay: int = 2):
"""Wait for LightRAG server to be ready."""
logger.info("Waiting for LightRAG server to be ready...")
for i in range(max_retries):
try:
response = await self.client.get(f"{self.base_url}/health")
if response.status_code == 200:
logger.info("✅ LightRAG server is ready!")
return True
except Exception as e:
logger.debug(f"Attempt {i+1}/{max_retries}: Server not ready yet - {e}")
await asyncio.sleep(retry_delay)
logger.error("❌ Server failed to become ready in time")
return False
async def test_health_endpoint(self):
"""Test health check endpoint."""
test_name = "Health Check"
try:
response = await self.client.get(f"{self.base_url}/health")
passed = response.status_code == 200
self.log_result(test_name, passed, f"Status: {response.status_code}")
return passed
except Exception as e:
self.log_result(test_name, False, f"Error: {e}")
return False
async def test_insert_text(self, text: str, description: str = ""):
"""Test document insertion via API."""
test_name = f"Insert Document{' - ' + description if description else ''}"
try:
response = await self.client.post(
f"{self.base_url}/documents/text",
json={"text": text, "description": description},
)
passed = response.status_code == 200
self.log_result(test_name, passed, f"Status: {response.status_code}")
return passed
except Exception as e:
self.log_result(test_name, False, f"Error: {e}")
return False
async def test_insert_file(self, file_path: Path, retry_count: int = 2):
"""Test file insertion via API with retry logic and fallback to text endpoint."""
test_name = f"Insert File - {file_path.name}"
# Check if this is a header file that should use text endpoint
use_text_endpoint = file_path.suffix in [".h", ".hpp", ".hh"]
for attempt in range(retry_count + 1):
try:
if use_text_endpoint:
# Use text insertion endpoint for header files
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
response = await self.client.post(
f"{self.base_url}/documents/text",
json={"text": content, "file_source": file_path.name},
)
else:
# Use file upload endpoint for other files
with open(file_path, "rb") as f:
files = {"file": (file_path.name, f, "text/plain")}
response = await self.client.post(
f"{self.base_url}/documents/upload", files=files
)
if response.status_code == 200:
self.log_result(test_name, True, f"Status: {response.status_code}")
return True
elif response.status_code == 400:
# Check if it's unsupported file type error
try:
error_detail = response.json()
error_msg = error_detail.get("detail", "")
if (
"Unsupported file type" in error_msg
and not use_text_endpoint
):
# Fallback to text endpoint
logger.info(
f"File type not supported for upload, trying text endpoint for {file_path.name}"
)
use_text_endpoint = True
continue
except (json.JSONDecodeError, ValueError, KeyError):
pass
self.log_result(test_name, False, f"Status: {response.status_code}")
return False
elif response.status_code == 500:
# Try to get error details
try:
error_detail = response.json()
error_msg = error_detail.get("detail", "Unknown error")
except (json.JSONDecodeError, ValueError, KeyError):
error_msg = (
response.text[:200] if response.text else "No error details"
)
if attempt < retry_count:
logger.warning(
f"Attempt {attempt + 1} failed for {file_path.name}: {error_msg}. Retrying..."
)
await asyncio.sleep(2) # Wait before retry
continue
else:
self.log_result(
test_name,
False,
f"Status: {response.status_code}, Error: {error_msg}",
)
return False
else:
self.log_result(test_name, False, f"Status: {response.status_code}")
return False
except Exception as e:
if attempt < retry_count:
logger.warning(
f"Attempt {attempt + 1} exception for {file_path.name}: {e}. Retrying..."
)
await asyncio.sleep(2)
continue
else:
self.log_result(test_name, False, f"Error: {e}")
return False
return False
async def test_query(self, query: str, mode: str = "hybrid"):
"""Test query endpoint."""
test_name = f"Query ({mode} mode)"
try:
response = await self.client.post(
f"{self.base_url}/query",
json={"query": query, "mode": mode, "stream": False},
)
passed = response.status_code == 200
if passed:
result = response.json()
response_text = result.get("response", "")
logger.info(f"Query response preview: {response_text[:200]}...")
self.log_result(test_name, passed, f"Status: {response.status_code}")
return passed
except Exception as e:
self.log_result(test_name, False, f"Error: {e}")
return False
async def test_query_with_data(self, query: str, mode: str = "hybrid"):
"""Test query/data endpoint that returns structured data."""
test_name = f"Query Data ({mode} mode)"
try:
response = await self.client.post(
f"{self.base_url}/query/data",
json={"query": query, "mode": mode, "top_k": 10},
)
passed = response.status_code == 200
if passed:
result = response.json()
# Validate response structure
has_data = "data" in result
has_metadata = "metadata" in result
if not (has_data and has_metadata):
passed = False
self.log_result(
test_name, passed, "Missing required fields in response"
)
else:
data = result.get("data", {})
entities_count = len(data.get("entities", []))
relations_count = len(data.get("relationships", []))
chunks_count = len(data.get("chunks", []))
logger.info(
f"Retrieved: {entities_count} entities, {relations_count} relations, {chunks_count} chunks"
)
self.log_result(
test_name, passed, f"Status: {response.status_code}"
)
else:
self.log_result(test_name, passed, f"Status: {response.status_code}")
return passed
except Exception as e:
self.log_result(test_name, False, f"Error: {e}")
return False
async def test_graph_data(self):
"""Test graph data retrieval endpoint."""
test_name = "Graph Data Retrieval"
try:
response = await self.client.get(f"{self.base_url}/graph/label/list")
passed = response.status_code == 200
if passed:
result = response.json()
# Result is a list of labels
if isinstance(result, list):
logger.info(f"Graph contains {len(result)} unique labels")
else:
logger.info(f"Graph data: {result}")
self.log_result(test_name, passed, f"Status: {response.status_code}")
return passed
except Exception as e:
self.log_result(test_name, False, f"Error: {e}")
return False
async def run_all_tests(self, cpp_repo_path: Path):
"""Run all integration tests."""
logger.info("=" * 80)
logger.info("Starting LightRAG Integration Tests")
logger.info("=" * 80)
# Wait for server to be ready
if not await self.wait_for_server():
logger.error("Server not ready. Aborting tests.")
return False
# Test 1: Health check
await self.test_health_endpoint()
# Test 2: Index C++ files
logger.info("\n--- Testing Document Indexing ---")
cpp_files = list(cpp_repo_path.glob("**/*.cpp")) + list(
cpp_repo_path.glob("**/*.h")
)
for cpp_file in cpp_files:
if cpp_file.is_file():
await self.test_insert_file(cpp_file)
await asyncio.sleep(
0.5
) # Small delay between uploads to avoid overwhelming server
# Also insert the README
readme_file = cpp_repo_path / "README.md"
if readme_file.exists():
await self.test_insert_file(readme_file)
# Wait a bit for indexing to complete
logger.info("Waiting for indexing to complete...")
await asyncio.sleep(5)
# Test 3: Query operations
logger.info("\n--- Testing Query Operations ---")
test_queries = [
("What is the Calculator class?", "hybrid"),
("Describe the main function", "local"),
("What mathematical operations are supported?", "global"),
("How does the power function work?", "naive"),
]
for query, mode in test_queries:
await self.test_query(query, mode)
await asyncio.sleep(1) # Brief delay between queries
# Test 4: Query with structured data
logger.info("\n--- Testing Query Data Endpoint ---")
await self.test_query_with_data(
"What classes are defined in the code?", "hybrid"
)
await self.test_query_with_data("List all functions", "local")
# Test 5: Graph data retrieval
logger.info("\n--- Testing Graph Retrieval ---")
await self.test_graph_data()
# Print summary
logger.info("\n" + "=" * 80)
logger.info("Test Summary")
logger.info("=" * 80)
total = len(self.test_results)
passed = sum(1 for r in self.test_results if r["passed"])
failed = total - passed
logger.info(f"Total Tests: {total}")
logger.info(f"Passed: {passed}")
logger.info(f"Failed: {failed}")
if failed > 0:
logger.info("\nFailed Tests:")
for result in self.test_results:
if not result["passed"]:
logger.info(f" - {result['test']}: {result['message']}")
return failed == 0
async def main():
"""Main test execution."""
# Get test repository path
script_dir = Path(__file__).parent
cpp_repo_path = script_dir / "sample_cpp_repo"
if not cpp_repo_path.exists():
logger.error(f"Sample C++ repository not found at {cpp_repo_path}")
return 1
# Get server URL from environment or use default
base_url = os.getenv("LIGHTRAG_API_URL", "http://localhost:9621")
# Run tests
async with IntegrationTestRunner(base_url) as runner:
success = await runner.run_all_tests(cpp_repo_path)
return 0 if success else 1
if __name__ == "__main__":
exit_code = asyncio.run(main())
sys.exit(exit_code)

222
tests/mock_openai_server.py Normal file
View file

@ -0,0 +1,222 @@
#!/usr/bin/env python3
"""
Mock OpenAI-compatible API server for integration testing.
This server mocks OpenAI's API endpoints for:
- Chat completions (LLM)
- Embeddings
Used for integration tests to avoid requiring actual API keys.
"""
import asyncio
import json
import logging
from datetime import datetime
from typing import List, Dict
import numpy as np
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse, StreamingResponse
import uvicorn
# Configure logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
app = FastAPI(title="Mock OpenAI API")
def generate_mock_embedding(text: str, dimensions: int = 3072) -> List[float]:
"""Generate deterministic mock embedding based on text content."""
# Use hash of text to generate deterministic embeddings
hash_value = hash(text)
np.random.seed(abs(hash_value) % (2**32))
embedding = np.random.randn(dimensions).astype(float)
# Normalize to unit vector
norm = np.linalg.norm(embedding)
if norm > 0:
embedding = embedding / norm
return embedding.tolist()
def generate_mock_chat_response(messages: List[Dict], model: str = "gpt-5") -> str:
"""Generate mock chat completion response based on the query."""
# Extract the user's query
user_query = ""
for msg in messages:
if msg.get("role") == "user":
user_query = msg.get("content", "")
break
# Generate contextual responses based on keywords
if "entity" in user_query.lower() or "extract" in user_query.lower():
# Entity extraction response
response = json.dumps(
{
"entities": [
{"entity_name": "SampleClass", "entity_type": "Class"},
{"entity_name": "main", "entity_type": "Function"},
{"entity_name": "std::cout", "entity_type": "Component"},
],
"relationships": [
{
"src_id": "main",
"tgt_id": "SampleClass",
"description": "main function creates and uses SampleClass",
"keywords": "instantiation,usage",
}
],
}
)
elif "summary" in user_query.lower() or "summarize" in user_query.lower():
response = "This is a sample C++ program that demonstrates basic class usage and console output."
elif "theme" in user_query.lower():
response = "The main themes in this code are object-oriented programming, console I/O, and basic C++ syntax."
elif "describe" in user_query.lower():
response = "The code defines a simple C++ class with basic functionality and a main function that instantiates and uses the class."
else:
# Generic response
response = f"Mock response for query: {user_query[:100]}"
return response
@app.post("/v1/chat/completions")
@app.post("/chat/completions")
async def chat_completions(request: Request):
"""Mock chat completions endpoint."""
try:
data = await request.json()
logger.info(f"Received chat completion request: model={data.get('model')}")
messages = data.get("messages", [])
model = data.get("model", "gpt-5")
stream = data.get("stream", False)
response_text = generate_mock_chat_response(messages, model)
if stream:
# Streaming response
async def generate_stream():
# Split response into chunks
words = response_text.split()
for i, word in enumerate(words):
chunk = {
"id": f"chatcmpl-mock-{i}",
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"delta": {"content": word + " "}
if i > 0
else {"role": "assistant", "content": word + " "},
"finish_reason": None,
}
],
}
yield f"data: {json.dumps(chunk)}\n\n"
await asyncio.sleep(0.01)
# Final chunk
final_chunk = {
"id": "chatcmpl-mock-final",
"object": "chat.completion.chunk",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [{"index": 0, "delta": {}, "finish_reason": "stop"}],
}
yield f"data: {json.dumps(final_chunk)}\n\n"
yield "data: [DONE]\n\n"
return StreamingResponse(generate_stream(), media_type="text/event-stream")
else:
# Non-streaming response
response = {
"id": "chatcmpl-mock",
"object": "chat.completion",
"created": int(datetime.now().timestamp()),
"model": model,
"choices": [
{
"index": 0,
"message": {"role": "assistant", "content": response_text},
"finish_reason": "stop",
}
],
"usage": {
"prompt_tokens": 50,
"completion_tokens": 100,
"total_tokens": 150,
},
}
return JSONResponse(content=response)
except Exception as e:
logger.error(f"Error in chat completions: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.post("/v1/embeddings")
@app.post("/embeddings")
async def embeddings(request: Request):
"""Mock embeddings endpoint."""
try:
data = await request.json()
logger.info(f"Received embeddings request: model={data.get('model')}")
input_texts = data.get("input", [])
if isinstance(input_texts, str):
input_texts = [input_texts]
model = data.get("model", "text-embedding-3-large")
dimensions = data.get("dimensions", 3072)
# Generate embeddings for each text
embeddings_data = []
for i, text in enumerate(input_texts):
embedding = generate_mock_embedding(text, dimensions)
embeddings_data.append(
{"object": "embedding", "embedding": embedding, "index": i}
)
response = {
"object": "list",
"data": embeddings_data,
"model": model,
"usage": {
"prompt_tokens": len(input_texts) * 10,
"total_tokens": len(input_texts) * 10,
},
}
return JSONResponse(content=response)
except Exception as e:
logger.error(f"Error in embeddings: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/health")
async def health():
"""Health check endpoint."""
return {"status": "healthy"}
def main():
"""Run the mock OpenAI server."""
import argparse
parser = argparse.ArgumentParser(description="Mock OpenAI API Server")
parser.add_argument("--host", default="127.0.0.1", help="Host to bind to")
parser.add_argument("--port", type=int, default=8000, help="Port to bind to")
args = parser.parse_args()
logger.info(f"Starting Mock OpenAI API server on {args.host}:{args.port}")
uvicorn.run(app, host=args.host, port=args.port, log_level="info")
if __name__ == "__main__":
main()

View file

@ -0,0 +1,18 @@
# Sample C++ Project
This is a simple C++ project used for integration testing of LightRAG.
## Files
- `main.cpp` - Main application entry point
- `calculator.h` - Calculator class header
- `calculator.cpp` - Calculator class implementation
- `utils.h` - Utility functions header
- `utils.cpp` - Utility functions implementation
## Building
```bash
g++ -o sample_app main.cpp calculator.cpp utils.cpp
./sample_app
```

View file

@ -0,0 +1,75 @@
#include "calculator.h"
#include <iostream>
#include <cmath>
#include <stdexcept>
Calculator::Calculator() : operationCount(0), lastResult(0.0) {
std::cout << "Calculator initialized" << std::endl;
}
Calculator::~Calculator() {
std::cout << "Calculator destroyed" << std::endl;
}
double Calculator::add(double a, double b) {
operationCount++;
lastResult = a + b;
return lastResult;
}
double Calculator::subtract(double a, double b) {
operationCount++;
lastResult = a - b;
return lastResult;
}
double Calculator::multiply(double a, double b) {
operationCount++;
lastResult = a * b;
return lastResult;
}
double Calculator::divide(double a, double b) {
if (b == 0) {
throw std::runtime_error("Division by zero error");
}
operationCount++;
lastResult = a / b;
return lastResult;
}
double Calculator::power(double base, int exponent) {
operationCount++;
lastResult = std::pow(base, exponent);
return lastResult;
}
double Calculator::squareRoot(double number) {
if (number < 0) {
throw std::runtime_error("Cannot calculate square root of negative number");
}
operationCount++;
lastResult = std::sqrt(number);
return lastResult;
}
double Calculator::getLastResult() const {
return lastResult;
}
int Calculator::getOperationCount() const {
return operationCount;
}
void Calculator::reset() {
operationCount = 0;
lastResult = 0.0;
std::cout << "Calculator reset" << std::endl;
}
void Calculator::displayStatistics() const {
std::cout << "\\n=== Calculator Statistics ===" << std::endl;
std::cout << "Operations performed: " << operationCount << std::endl;
std::cout << "Last result: " << lastResult << std::endl;
std::cout << "===========================\\n" << std::endl;
}

View file

@ -0,0 +1,94 @@
#ifndef CALCULATOR_H
#define CALCULATOR_H
/**
* Calculator class for performing mathematical operations
* Provides basic arithmetic and advanced mathematical functions
*/
class Calculator {
private:
int operationCount; // Track number of operations performed
double lastResult; // Store the result of the last operation
public:
/**
* Constructor - initializes the calculator
*/
Calculator();
/**
* Destructor - cleans up resources
*/
~Calculator();
/**
* Add two numbers
* @param a First number
* @param b Second number
* @return Sum of a and b
*/
double add(double a, double b);
/**
* Subtract two numbers
* @param a First number
* @param b Second number
* @return Difference of a and b
*/
double subtract(double a, double b);
/**
* Multiply two numbers
* @param a First number
* @param b Second number
* @return Product of a and b
*/
double multiply(double a, double b);
/**
* Divide two numbers
* @param a Dividend
* @param b Divisor
* @return Quotient of a divided by b
*/
double divide(double a, double b);
/**
* Calculate power of a number
* @param base Base number
* @param exponent Exponent
* @return base raised to the power of exponent
*/
double power(double base, int exponent);
/**
* Calculate square root of a number
* @param number Input number
* @return Square root of the number
*/
double squareRoot(double number);
/**
* Get the last computed result
* @return Last result value
*/
double getLastResult() const;
/**
* Get the number of operations performed
* @return Operation count
*/
int getOperationCount() const;
/**
* Reset the calculator state
*/
void reset();
/**
* Display calculator statistics
*/
void displayStatistics() const;
};
#endif // CALCULATOR_H

View file

@ -0,0 +1,33 @@
#include <iostream>
#include "calculator.h"
#include "utils.h"
/**
* Main application entry point
* Demonstrates the usage of Calculator class and utility functions
*/
int main() {
// Print welcome message
printWelcomeMessage();
// Create calculator instance
Calculator calc;
// Perform basic arithmetic operations
std::cout << "Addition: 5 + 3 = " << calc.add(5, 3) << std::endl;
std::cout << "Subtraction: 5 - 3 = " << calc.subtract(5, 3) << std::endl;
std::cout << "Multiplication: 5 * 3 = " << calc.multiply(5, 3) << std::endl;
std::cout << "Division: 6 / 2 = " << calc.divide(6, 2) << std::endl;
// Test advanced operations
std::cout << "Power: 2^8 = " << calc.power(2, 8) << std::endl;
std::cout << "Square root: sqrt(16) = " << calc.squareRoot(16) << std::endl;
// Display statistics
calc.displayStatistics();
// Print goodbye message
printGoodbyeMessage();
return 0;
}

View file

@ -0,0 +1,46 @@
#include "utils.h"
#include <iostream>
#include <iomanip>
#include <sstream>
#include <cmath>
void printWelcomeMessage() {
std::cout << "\\n=====================================" << std::endl;
std::cout << " Welcome to Calculator Demo!" << std::endl;
std::cout << "=====================================\\n" << std::endl;
}
void printGoodbyeMessage() {
std::cout << "\\n=====================================" << std::endl;
std::cout << " Thank you for using Calculator!" << std::endl;
std::cout << "=====================================\\n" << std::endl;
}
std::string formatNumber(double number, int precision) {
std::ostringstream stream;
stream << std::fixed << std::setprecision(precision) << number;
return stream.str();
}
bool isPrime(int number) {
if (number <= 1) return false;
if (number <= 3) return true;
if (number % 2 == 0 || number % 3 == 0) return false;
for (int i = 5; i * i <= number; i += 6) {
if (number % i == 0 || number % (i + 2) == 0)
return false;
}
return true;
}
long long factorial(int n) {
if (n < 0) return -1; // Error case
if (n == 0 || n == 1) return 1;
long long result = 1;
for (int i = 2; i <= n; i++) {
result *= i;
}
return result;
}

View file

@ -0,0 +1,38 @@
#ifndef UTILS_H
#define UTILS_H
#include <string>
/**
* Print a welcome message to the console
*/
void printWelcomeMessage();
/**
* Print a goodbye message to the console
*/
void printGoodbyeMessage();
/**
* Format a number with specified precision
* @param number Number to format
* @param precision Number of decimal places
* @return Formatted string representation
*/
std::string formatNumber(double number, int precision);
/**
* Check if a number is prime
* @param number Number to check
* @return true if prime, false otherwise
*/
bool isPrime(int number);
/**
* Calculate factorial of a number
* @param n Input number
* @return Factorial of n
*/
long long factorial(int n);
#endif // UTILS_H

224
tests/simple_tokenizer.py Normal file
View file

@ -0,0 +1,224 @@
"""
Simple tokenizer implementation for offline integration testing.
This tokenizer doesn't require internet access and provides a basic
word-based tokenization suitable for testing purposes.
"""
from typing import List
import re
class SimpleTokenizerImpl:
"""
A simple word-based tokenizer that works offline.
This tokenizer:
- Splits text into words and punctuation
- Doesn't require downloading any external files
- Provides deterministic token IDs based on a vocabulary
"""
def __init__(self):
# Build a simple vocabulary for common tokens
# This is a simplified approach - real tokenizers have much larger vocabularies
self.vocab = self._build_vocab()
self.inverse_vocab = {v: k for k, v in self.vocab.items()}
self.unk_token_id = len(self.vocab)
def _build_vocab(self) -> dict:
"""Build a basic vocabulary of common tokens."""
vocab = {}
current_id = 0
# Add common words and symbols
common_tokens = [
# Whitespace and punctuation
" ",
"\n",
"\t",
".",
",",
"!",
"?",
";",
":",
"(",
")",
"[",
"]",
"{",
"}",
'"',
"'",
"-",
"_",
"/",
"\\",
"@",
"#",
"$",
"%",
"&",
"*",
"+",
"=",
# Common programming keywords (for C++ code)
"class",
"struct",
"public",
"private",
"protected",
"void",
"int",
"double",
"float",
"char",
"bool",
"if",
"else",
"for",
"while",
"return",
"include",
"namespace",
"using",
"const",
"static",
"virtual",
"new",
"delete",
"this",
"nullptr",
"true",
"false",
# Common English words
"the",
"a",
"an",
"and",
"or",
"but",
"in",
"on",
"at",
"to",
"from",
"with",
"by",
"for",
"of",
"is",
"are",
"was",
"were",
"be",
"been",
"being",
"have",
"has",
"had",
"do",
"does",
"did",
"will",
"would",
"should",
"could",
"can",
"may",
"might",
"must",
"not",
"no",
"yes",
"this",
"that",
"these",
"those",
"what",
"which",
"who",
"when",
"where",
"why",
"how",
]
for token in common_tokens:
vocab[token.lower()] = current_id
current_id += 1
return vocab
def _tokenize(self, text: str) -> List[str]:
"""Split text into tokens (words and punctuation)."""
# Simple pattern to split on whitespace and keep punctuation separate
pattern = r"\w+|[^\w\s]"
tokens = re.findall(pattern, text)
return tokens
def encode(self, content: str) -> List[int]:
"""
Encode a string into a list of token IDs.
Args:
content: The string to encode.
Returns:
A list of integer token IDs.
"""
if not content:
return []
tokens = self._tokenize(content)
token_ids = []
for token in tokens:
token_lower = token.lower()
if token_lower in self.vocab:
token_ids.append(self.vocab[token_lower])
else:
# For unknown tokens, use a hash-based ID to be deterministic
# Offset by vocab size to avoid collisions
hash_id = abs(hash(token)) % 10000 + len(self.vocab)
token_ids.append(hash_id)
return token_ids
def decode(self, tokens: List[int]) -> str:
"""
Decode a list of token IDs into a string.
Args:
tokens: The list of token IDs to decode.
Returns:
The decoded string.
"""
if not tokens:
return ""
words = []
for token_id in tokens:
if token_id in self.inverse_vocab:
words.append(self.inverse_vocab[token_id])
else:
# For unknown IDs, use a placeholder
words.append(f"<unk_{token_id}>")
# Simple reconstruction - join words with spaces
# This is a simplification; real tokenizers preserve exact spacing
return " ".join(words)
def create_simple_tokenizer():
"""
Create a simple tokenizer for offline use.
Returns:
A Tokenizer instance using SimpleTokenizerImpl.
"""
from lightrag.utils import Tokenizer
return Tokenizer("simple-tokenizer", SimpleTokenizerImpl())

32
tests/start_server_offline.py Executable file
View file

@ -0,0 +1,32 @@
#!/usr/bin/env python3
"""
Start LightRAG server for integration testing with offline-compatible tokenizer.
This script initializes the LightRAG server with a simple tokenizer that doesn't
require internet access, making it suitable for integration testing in restricted
network environments.
"""
import os
import sys
from pathlib import Path
# Add parent directory to path to import from tests
sys.path.insert(0, str(Path(__file__).parent))
def start_server():
"""Start LightRAG server with offline-compatible configuration."""
# Import here after setting up the path
from lightrag.api.lightrag_server import main
# Override the tokenizer in global args before server starts
# This will be used when creating the LightRAG instance
os.environ["LIGHTRAG_OFFLINE_TOKENIZER"] = "true"
# Start the server
main()
if __name__ == "__main__":
start_server()