Day 10: Testing
Welcome to Day 10! Today we'll learn how to write and run tests for Nexios applications.
Understanding Testing in Nexios
Testing covers:
- Unit tests
- Integration tests
- End-to-end tests
- Performance tests
- Security tests
- Coverage reporting
Basic Test Setup
python
import pytest
from nexios import NexiosApp
from nexios.http import Request, Response
from nexios.testing import Client
@pytest.fixture
def app():
app = NexiosApp()
@app.get("/")
async def hello(request: Request, response: Response):
return response.json({"message": "Hello, World!"})
return app
@pytest.fixture
def client(app):
return Client(app)
def test_hello_endpoint(client):
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello, World!"}
Testing Routes
python
from nexios import NexiosApp
from nexios.http import Request, Response
import pytest
from nexios.testing import Client
# Sample application
app = NexiosApp()
@app.get("/users/{user_id}")
async def get_user(request: Request, response: Response, user_id: int):
if user_id == 404:
return response.json(
{"error": "User not found"},
status_code=404
)
return response.json({
"id": user_id,
"name": "Test User"
})
@app.post("/users")
async def create_user(request: Request, response: Response):
data = await request.json()
if "name" not in data:
return response.json(
{"error": "Name is required"},
status_code=400
)
return response.json(
{"id": 1, **data},
status_code=201
)
# Tests
@pytest.fixture
def client():
return Client(app)
def test_get_user_success(client):
response = client.get("/users/1")
assert response.status_code == 200
assert response.json() == {
"id": 1,
"name": "Test User"
}
def test_get_user_not_found(client):
response = client.get("/users/404")
assert response.status_code == 404
assert response.json() == {"error": "User not found"}
def test_create_user_success(client):
response = client.post(
"/users",
json={"name": "New User"}
)
assert response.status_code == 201
assert response.json() == {
"id": 1,
"name": "New User"
}
def test_create_user_validation_error(client):
response = client.post("/users", json={})
assert response.status_code == 400
assert response.json() == {"error": "Name is required"}
Testing Database Operations
python
import pytest
from databases import Database
import sqlalchemy
from nexios import NexiosApp
from nexios.testing import Client
# Test database
TEST_DATABASE_URL = "sqlite:///./test.db"
# Models
metadata = sqlalchemy.MetaData()
users = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("name", sqlalchemy.String),
sqlalchemy.Column("email", sqlalchemy.String, unique=True)
)
# Application with database
app = NexiosApp()
database = Database(TEST_DATABASE_URL)
app.state.database = database
@app.on_event("startup")
async def startup():
await database.connect()
engine = sqlalchemy.create_engine(TEST_DATABASE_URL)
metadata.create_all(engine)
@app.on_event("shutdown")
async def shutdown():
await database.disconnect()
@app.post("/users")
async def create_user(request: Request, response: Response):
data = await request.json()
query = users.insert().values(**data)
user_id = await database.execute(query)
return response.json({"id": user_id, **data}, status_code=201)
# Tests
@pytest.fixture
async def test_database():
await database.connect()
# Create tables
engine = sqlalchemy.create_engine(TEST_DATABASE_URL)
metadata.create_all(engine)
yield database
# Cleanup
metadata.drop_all(engine)
await database.disconnect()
@pytest.fixture
def client(test_database):
return Client(app)
async def test_create_user(client):
response = client.post(
"/users",
json={
"name": "Test User",
"email": "test@example.com"
}
)
assert response.status_code == 201
data = response.json()
assert data["name"] == "Test User"
assert data["email"] == "test@example.com"
# Verify in database
query = users.select().where(users.c.id == data["id"])
user = await database.fetch_one(query)
assert user["name"] == "Test User"
assert user["email"] == "test@example.com"
Testing WebSockets
python
import pytest
from nexios import NexiosApp, WebSocket
from nexios.testing import Client
app = NexiosApp()
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
data = await websocket.receive_json()
await websocket.send_json({"message": f"Received: {data['message']}"})
await websocket.close()
@pytest.fixture
def client():
return Client(app)
async def test_websocket(client):
with client.websocket_connect("/ws") as websocket:
websocket.send_json({"message": "Hello"})
data = websocket.receive_json()
assert data == {"message": "Received: Hello"}
Testing Authentication
python
import pytest
from nexios import NexiosApp
from nexios.http import Request, Response
import jwt
SECRET_KEY = "test-secret"
app = NexiosApp()
async def auth_middleware(request: Request, response: Response, call_next):
auth = request.headers.get("Authorization")
if not auth or not auth.startswith("Bearer "):
return response.json(
{"error": "Missing token"},
status_code=401
)
token = auth.split(" ")[1]
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=["HS256"])
request.state.user = payload
except jwt.InvalidTokenError:
return response.json(
{"error": "Invalid token"},
status_code=401
)
return await call_next()
@app.get("/protected", middleware=[auth_middleware])
async def protected_route(request: Request, response: Response):
return response.json({
"message": f"Hello, {request.state.user['sub']}"
})
@pytest.fixture
def client():
return Client(app)
@pytest.fixture
def auth_token():
return jwt.encode(
{"sub": "testuser"},
SECRET_KEY,
algorithm="HS256"
)
def test_protected_route_with_token(client, auth_token):
response = client.get(
"/protected",
headers={"Authorization": f"Bearer {auth_token}"}
)
assert response.status_code == 200
assert response.json() == {"message": "Hello, testuser"}
def test_protected_route_without_token(client):
response = client.get("/protected")
assert response.status_code == 401
assert response.json() == {"error": "Missing token"}
def test_protected_route_invalid_token(client):
response = client.get(
"/protected",
headers={"Authorization": "Bearer invalid"}
)
assert response.status_code == 401
assert response.json() == {"error": "Invalid token"}
Testing Middleware
python
import pytest
from nexios import NexiosApp
from nexios.http import Request, Response
import time
app = NexiosApp()
async def timing_middleware(request: Request, response: Response, call_next):
start_time = time.time()
response = await call_next()
duration = time.time() - start_time
response.headers["X-Process-Time"] = f"{duration:.4f}"
return response
@app.get("/test", middleware=[timing_middleware])
async def test_route(request: Request, response: Response):
return response.json({"message": "Hello"})
@pytest.fixture
def client():
return Client(app)
def test_timing_middleware(client):
response = client.get("/test")
assert response.status_code == 200
assert "X-Process-Time" in response.headers
duration = float(response.headers["X-Process-Time"])
assert duration >= 0
Performance Testing
python
import pytest
from nexios import NexiosApp
from nexios.http import Request, Response
import asyncio
import time
from concurrent.futures import ThreadPoolExecutor
app = NexiosApp()
@app.get("/fast")
async def fast_route(request: Request, response: Response):
return response.json({"message": "Fast response"})
@app.get("/slow")
async def slow_route(request: Request, response: Response):
await asyncio.sleep(0.1)
return response.json({"message": "Slow response"})
@pytest.fixture
def client():
return Client(app)
def test_route_performance(client):
# Test fast route
start_time = time.time()
response = client.get("/fast")
duration = time.time() - start_time
assert response.status_code == 200
assert duration < 0.01 # Should respond in less than 10ms
# Test slow route
start_time = time.time()
response = client.get("/slow")
duration = time.time() - start_time
assert response.status_code == 200
assert 0.1 <= duration <= 0.15 # Should take about 100ms
def test_concurrent_requests(client):
def make_request():
return client.get("/fast")
# Make 100 concurrent requests
with ThreadPoolExecutor(max_workers=10) as executor:
start_time = time.time()
responses = list(executor.map(make_request, range(100)))
duration = time.time() - start_time
# Verify responses
assert all(r.status_code == 200 for r in responses)
assert duration < 1.0 # Should handle 100 requests in less than 1 second
Mini-Project: Testing a Blog API
python
import pytest
from nexios import NexiosApp
from nexios.http import Request, Response
from nexios.testing import Client
from databases import Database
import sqlalchemy
from datetime import datetime
import jwt
# Database setup
TEST_DATABASE_URL = "sqlite:///./test_blog.db"
database = Database(TEST_DATABASE_URL)
metadata = sqlalchemy.MetaData()
# Models
users = sqlalchemy.Table(
"users",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("username", sqlalchemy.String, unique=True),
sqlalchemy.Column("password", sqlalchemy.String)
)
posts = sqlalchemy.Table(
"posts",
metadata,
sqlalchemy.Column("id", sqlalchemy.Integer, primary_key=True),
sqlalchemy.Column("title", sqlalchemy.String),
sqlalchemy.Column("content", sqlalchemy.String),
sqlalchemy.Column("user_id", sqlalchemy.ForeignKey("users.id")),
sqlalchemy.Column("created_at", sqlalchemy.DateTime, default=datetime.utcnow)
)
# Application
app = NexiosApp()
app.state.database = database
# Auth middleware
async def auth_middleware(request: Request, response: Response, call_next):
auth = request.headers.get("Authorization")
if not auth or not auth.startswith("Bearer "):
return response.json(
{"error": "Missing token"},
status_code=401
)
try:
token = auth.split(" ")[1]
payload = jwt.decode(token, "secret", algorithms=["HS256"])
request.state.user = payload
except jwt.InvalidTokenError:
return response.json(
{"error": "Invalid token"},
status_code=401
)
return await call_next()
# Routes
@app.post("/users")
async def create_user(request: Request, response: Response):
data = await request.json()
query = users.insert().values(**data)
user_id = await database.execute(query)
token = jwt.encode(
{"sub": user_id, "username": data["username"]},
"secret",
algorithm="HS256"
)
return response.json({
"id": user_id,
"username": data["username"],
"token": token
}, status_code=201)
@app.post("/posts", middleware=[auth_middleware])
async def create_post(request: Request, response: Response):
data = await request.json()
data["user_id"] = request.state.user["sub"]
query = posts.insert().values(**data)
post_id = await database.execute(query)
return response.json({
"id": post_id,
**data
}, status_code=201)
@app.get("/posts")
async def list_posts(request: Request, response: Response):
query = posts.select().order_by(posts.c.created_at.desc())
results = await database.fetch_all(query)
return response.json([dict(r) for r in results])
@app.get("/posts/{post_id}")
async def get_post(request: Request, response: Response, post_id: int):
query = posts.select().where(posts.c.id == post_id)
post = await database.fetch_one(query)
if not post:
return response.json(
{"error": "Post not found"},
status_code=404
)
return response.json(dict(post))
# Tests
@pytest.fixture
async def test_database():
await database.connect()
engine = sqlalchemy.create_engine(TEST_DATABASE_URL)
metadata.create_all(engine)
yield database
metadata.drop_all(engine)
await database.disconnect()
@pytest.fixture
def client(test_database):
return Client(app)
@pytest.fixture
async def test_user(client):
response = client.post(
"/users",
json={
"username": "testuser",
"password": "password"
}
)
return response.json()
async def test_create_user(client):
response = client.post(
"/users",
json={
"username": "newuser",
"password": "password"
}
)
assert response.status_code == 201
data = response.json()
assert data["username"] == "newuser"
assert "token" in data
async def test_create_post(client, test_user):
response = client.post(
"/posts",
json={
"title": "Test Post",
"content": "Test Content"
},
headers={"Authorization": f"Bearer {test_user['token']}"}
)
assert response.status_code == 201
data = response.json()
assert data["title"] == "Test Post"
assert data["content"] == "Test Content"
assert data["user_id"] == test_user["id"]
async def test_list_posts(client, test_user):
# Create test posts
for i in range(3):
client.post(
"/posts",
json={
"title": f"Post {i}",
"content": f"Content {i}"
},
headers={"Authorization": f"Bearer {test_user['token']}"}
)
response = client.get("/posts")
assert response.status_code == 200
posts = response.json()
assert len(posts) == 3
assert posts[0]["title"] == "Post 2" # Most recent first
async def test_get_post(client, test_user):
# Create post
post_response = client.post(
"/posts",
json={
"title": "Test Post",
"content": "Test Content"
},
headers={"Authorization": f"Bearer {test_user['token']}"}
)
post_id = post_response.json()["id"]
# Get post
response = client.get(f"/posts/{post_id}")
assert response.status_code == 200
post = response.json()
assert post["title"] == "Test Post"
assert post["content"] == "Test Content"
async def test_get_nonexistent_post(client):
response = client.get("/posts/999")
assert response.status_code == 404
assert response.json() == {"error": "Post not found"}
async def test_create_post_without_auth(client):
response = client.post(
"/posts",
json={
"title": "Test Post",
"content": "Test Content"
}
)
assert response.status_code == 401
assert response.json() == {"error": "Missing token"}
if __name__ == "__main__":
pytest.main([__file__])
Key Concepts Learned
- Test setup and fixtures
- Route testing
- Database testing
- WebSocket testing
- Authentication testing
- Middleware testing
- Performance testing
- Coverage reporting
Additional Resources
Homework
Create a test suite for an e-commerce API:
- Product management
- Shopping cart
- Order processing
- User accounts
Implement performance tests:
- Load testing
- Stress testing
- Endurance testing
- Spike testing
Build a CI/CD pipeline:
- Automated testing
- Code coverage
- Performance benchmarks
- Security scans
Next Steps
Tomorrow, we'll explore deployment in Day 11: Deployment.
🚀 Day 10: API Key Authentication
Creating API Keys
Implementing API key generation and management:
python
from nexios import get_application
from nexios.security import APIKeyAuth, generate_api_key
from nexios.http import Request, Response
from datetime import datetime, timedelta
import uuid
app = get_application()
# Configure API Key authentication
api_auth = APIKeyAuth(
prefix="nx", # API keys will start with "nx_"
key_length=32 # Length of the random part
)
app.auth_backend = api_auth
class APIKeyManager:
def __init__(self):
self.db = app.db # Your database connection
async def create_api_key(
self,
user_id: str,
name: str,
expires_in: timedelta = None
) -> dict:
# Generate new API key
api_key = await generate_api_key()
key_id = str(uuid.uuid4())
# Calculate expiration
expires_at = None
if expires_in:
expires_at = datetime.now() + expires_in
# Store key details (store hash, not the key itself)
key_hash = await api_auth.hash_key(api_key)
await self.db.api_keys.insert_one({
"id": key_id,
"user_id": user_id,
"name": name,
"key_hash": key_hash,
"created_at": datetime.now(),
"expires_at": expires_at,
"last_used": None
})
# Return the key (only time it's visible)
return {
"id": key_id,
"key": api_key,
"name": name,
"expires_at": expires_at
}
async def list_api_keys(self, user_id: str) -> list:
keys = await self.db.api_keys.find(
{"user_id": user_id}
).to_list(None)
return [{
"id": key["id"],
"name": key["name"],
"created_at": key["created_at"],
"expires_at": key["expires_at"],
"last_used": key["last_used"]
} for key in keys]
async def revoke_api_key(
self,
user_id: str,
key_id: str
) -> bool:
result = await self.db.api_keys.delete_one({
"id": key_id,
"user_id": user_id
})
return result.deleted_count > 0
# Initialize manager
key_manager = APIKeyManager()
@app.post("/api-keys")
@requires_auth # Regular user authentication
async def create_key(request: Request):
data = await request.json()
user_id = request.user.id
# Create new API key
key = await key_manager.create_api_key(
user_id=user_id,
name=data["name"],
expires_in=timedelta(days=data.get("expires_in_days", 365))
)
return key
@app.get("/api-keys")
@requires_auth
async def list_keys(request: Request):
keys = await key_manager.list_api_keys(request.user.id)
return {"api_keys": keys}
@app.delete("/api-keys/{key_id}")
@requires_auth
async def revoke_key(request: Request, key_id: str):
success = await key_manager.revoke_api_key(
request.user.id,
key_id
)
if not success:
return Response(
content={"error": "API key not found"},
status_code=404
)
return {"message": "API key revoked"}
Validating API Keys
Implementing API key validation:
python
from nexios.security import APIKeyValidator
from typing import Optional
class APIKeyValidator:
def __init__(self):
self.db = app.db
async def validate_key(self, api_key: str) -> Optional[dict]:
# Hash the provided key
key_hash = await api_auth.hash_key(api_key)
# Find key in database
key = await self.db.api_keys.find_one({
"key_hash": key_hash
})
if not key:
return None
# Check expiration
if key["expires_at"] and datetime.now() > key["expires_at"]:
return None
# Update last used timestamp
await self.db.api_keys.update_one(
{"id": key["id"]},
{"$set": {"last_used": datetime.now()}}
)
# Get associated user
user = await self.db.users.find_one({
"id": key["user_id"]
})
return {
"key_id": key["id"],
"user": user,
"name": key["name"]
}
# Initialize validator
key_validator = APIKeyValidator()
@app.middleware("http")
async def validate_api_key(request: Request, call_next):
# Skip validation for auth routes
if request.url.path.startswith("/auth"):
return await call_next(request)
# Check header first
api_key = request.headers.get("X-API-Key")
# Then check query parameter
if not api_key:
api_key = request.query_params.get("api_key")
if not api_key:
return Response(
content={"error": "API key required"},
status_code=401
)
# Validate key
key_info = await key_validator.validate_key(api_key)
if not key_info:
return Response(
content={"error": "Invalid API key"},
status_code=401
)
# Add key info to request
request.state.api_key = key_info
request.user = key_info["user"]
return await call_next(request)
Using API Keys
Examples of using API keys in requests:
python
# Using in header
@app.get("/api/data")
async def get_data(request: Request):
# API key already validated by middleware
user = request.user
key_info = request.state.api_key
return {
"data": "Your protected data",
"user": user.username,
"key_name": key_info["name"]
}
# Rate limiting by API key
from nexios.cache import RedisCache
from nexios.exceptions import RateLimitExceeded
class APIKeyRateLimiter:
def __init__(self, requests_per_minute: int = 60):
self.cache = RedisCache()
self.rate_limit = requests_per_minute
async def check_rate_limit(self, key_id: str) -> bool:
current = await self.cache.incr(f"rate:{key_id}")
if current == 1:
await self.cache.expire(f"rate:{key_id}", 60)
return current <= self.rate_limit
# Rate limiting middleware
rate_limiter = APIKeyRateLimiter(requests_per_minute=60)
@app.middleware("http")
async def rate_limit_by_key(request: Request, call_next):
if hasattr(request.state, "api_key"):
key_id = request.state.api_key["key_id"]
if not await rate_limiter.check_rate_limit(key_id):
return Response(
content={"error": "Rate limit exceeded"},
status_code=429
)
return await call_next(request)
📝 Practice Exercise
Build an API key management system:
- Key generation with custom prefixes
- Expiration dates
- Usage tracking
- Rate limiting
Implement security features:
- Key rotation
- Permissions per key
- IP restrictions
- Usage analytics
Create a key management UI:
- List active keys
- Create/revoke keys
- View usage stats
- Set permissions
📚 Additional Resources
🎯 Next Steps
Tomorrow in Day 11: Request Validation, we'll explore:
- Input validation
- Schema validation
- Custom validators
- Error handling