Python SDK Integration¶
The USSO Python SDK provides a plug-and-play authentication solution for Python applications. It supports FastAPI, Django, and direct JWT verification.
Installation¶
Install the USSO SDK via pip:
For specific framework support:
# For FastAPI applications
pip install "usso[fastapi]"
# For Django applications
pip install "usso[django]"
# Install all extras
pip install "usso[all]"
Quick Start¶
Basic Token Verification¶
from usso import USSOAuth
from usso.config import JWTConfig
# Configure USSO
config = JWTConfig(
jwks_url="http://localhost:8000/.well-known/jwks.json",
issuer="http://localhost:8000",
audience="your-app"
)
auth = USSOAuth(config)
# Verify a token
try:
user_data = auth.verify_token(access_token)
print(f"User ID: {user_data.sub}")
print(f"Roles: {user_data.roles}")
print(f"Scopes: {user_data.scopes}")
except Exception as e:
print(f"Token verification failed: {e}")
FastAPI Integration¶
The SDK provides seamless FastAPI integration with dependency injection:
Basic Setup¶
from fastapi import FastAPI, Depends
from usso.integrations.fastapi import get_authenticator
from usso.config import JWTConfig
from usso.schemas import UserData
# Configure USSO
config = JWTConfig(
jwks_url="http://localhost:8000/.well-known/jwks.json",
issuer="http://localhost:8000",
audience="your-app"
)
# Create authenticator dependency
authenticator = get_authenticator(config)
app = FastAPI()
@app.get("/protected")
def protected_route(user: UserData = Depends(authenticator)):
return {
"message": "Access granted",
"user_id": user.sub,
"roles": user.roles,
"scopes": user.scopes
}
Multiple Token Sources¶
The SDK automatically checks multiple sources for tokens:
- Authorization header:
Authorization: Bearer <token> - Custom headers: Configurable header name
- Cookies: HTTP-only secure cookies
from usso.config import JWTConfig, JWTHeaderConfig
config = JWTConfig(
jwks_url="http://localhost:8000/.well-known/jwks.json",
issuer="http://localhost:8000",
audience="your-app",
# Configure token sources
header=JWTHeaderConfig(
type="Authorization", # Header name
cookie_name="access_token" # Cookie name
)
)
Scope-Based Authorization¶
from fastapi import FastAPI, Depends, HTTPException, status
from usso.integrations.fastapi import get_authenticator
from usso.schemas import UserData
authenticator = get_authenticator(config)
app = FastAPI()
def require_scope(required_scope: str):
"""Dependency factory for scope checking"""
def scope_checker(user: UserData = Depends(authenticator)):
if required_scope not in user.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Required scope '{required_scope}' not found"
)
return user
return scope_checker
# Only users with 'write:posts' scope can access
@app.post("/posts")
def create_post(
post_data: dict,
user: UserData = Depends(require_scope("write:posts"))
):
return {"message": "Post created", "author": user.sub}
# Only users with 'delete:posts' scope can access
@app.delete("/posts/{post_id}")
def delete_post(
post_id: str,
user: UserData = Depends(require_scope("delete:posts"))
):
return {"message": f"Post {post_id} deleted"}
Role-Based Authorization¶
def require_role(required_role: str):
"""Dependency factory for role checking"""
def role_checker(user: UserData = Depends(authenticator)):
if required_role not in user.roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Required role '{required_role}' not found"
)
return user
return role_checker
# Only admins can access
@app.get("/admin/users")
def list_all_users(admin: UserData = Depends(require_role("admin"))):
# Return all users
return {"users": [...]}
# Editors and admins can access
def require_any_role(*roles):
def role_checker(user: UserData = Depends(authenticator)):
if not any(role in user.roles for role in roles):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"One of roles {roles} required"
)
return user
return role_checker
@app.post("/content")
def create_content(
content: dict,
user: UserData = Depends(require_any_role("editor", "admin"))
):
return {"message": "Content created"}
Workspace Isolation¶
from fastapi import FastAPI, Depends
from usso.schemas import UserData
from sqlalchemy import select
@app.get("/projects")
def list_projects(
user: UserData = Depends(authenticator),
db: Session = Depends(get_db)
):
# Filter by user's workspace
projects = db.execute(
select(Project).where(Project.workspace_id == user.workspace_id)
).scalars().all()
return {"projects": projects}
@app.post("/projects")
def create_project(
project_data: dict,
user: UserData = Depends(authenticator),
db: Session = Depends(get_db)
):
# Automatically assign to user's workspace
project = Project(
**project_data,
workspace_id=user.workspace_id,
created_by=user.sub
)
db.add(project)
db.commit()
return {"project": project}
Configuration Options¶
JWTConfig¶
Complete configuration options:
from usso.config import JWTConfig, JWTHeaderConfig
from usso.jwt.enums import Algorithm
config = JWTConfig(
# Required: JWT validation
issuer="http://localhost:8000",
audience="your-app",
# Option 1: Use JWKS URL (recommended)
jwks_url="http://localhost:8000/.well-known/jwks.json",
# Option 2: Provide public key directly
# key="-----BEGIN PUBLIC KEY-----\n...",
# type=Algorithm.EdDSA,
# Token extraction
header=JWTHeaderConfig(
type="Authorization", # Header name
prefix="Bearer", # Token prefix
cookie_name="access_token" # Cookie name (optional)
),
# Validation options
verify_exp=True, # Verify expiration
verify_nbf=True, # Verify not-before
verify_aud=True, # Verify audience
verify_iss=True, # Verify issuer
# Clock skew tolerance (seconds)
leeway=10
)
Algorithm Support¶
USSO supports multiple JWT algorithms:
| Algorithm | Description | Use Case |
|---|---|---|
| EdDSA | Ed25519 (default) | Recommended for performance |
| RS256 | RSA 2048-bit | Legacy compatibility |
| RS384 | RSA 3072-bit | Higher security |
| RS512 | RSA 4096-bit | Maximum security |
| ES256 | ECDSA P-256 | Mobile-friendly |
| ES384 | ECDSA P-384 | Higher security |
| HS256 | HMAC SHA-256 | Symmetric (not recommended) |
from usso.jwt.enums import Algorithm
# For EdDSA (recommended)
config = JWTConfig(
key="your-ed25519-public-key",
type=Algorithm.EdDSA,
issuer="...",
audience="..."
)
# For RS256
config = JWTConfig(
key="-----BEGIN PUBLIC KEY-----\n...",
type=Algorithm.RS256,
issuer="...",
audience="..."
)
Django Integration¶
Middleware Setup¶
# settings.py
MIDDLEWARE = [
# ... other middleware
'usso.integrations.django.middleware.USSOAuthMiddleware',
]
# USSO Configuration
USSO_CONFIG = {
'JWKS_URL': 'http://localhost:8000/.well-known/jwks.json',
'ISSUER': 'http://localhost:8000',
'AUDIENCE': 'your-app',
'VERIFY_EXP': True,
'VERIFY_AUD': True,
}
Using in Views¶
from django.http import JsonResponse
from usso.integrations.django.decorators import require_auth, require_scope
@require_auth
def protected_view(request):
user = request.usso_user
return JsonResponse({
'user_id': user.sub,
'roles': user.roles
})
@require_scope('write:posts')
def create_post(request):
user = request.usso_user
# Create post
return JsonResponse({'message': 'Post created'})
Class-Based Views¶
from django.views import View
from django.http import JsonResponse
from usso.integrations.django.mixins import USSOAuthMixin
class ProtectedView(USSOAuthMixin, View):
required_scopes = ['read:data']
def get(self, request):
user = request.usso_user
return JsonResponse({
'data': 'Protected data',
'user': user.sub
})
Advanced Usage¶
Custom User Data¶
Extend the UserData schema for custom claims:
from usso.schemas import UserData as BaseUserData
from pydantic import Field
class CustomUserData(BaseUserData):
"""Extended user data with custom claims"""
company_id: str | None = Field(None, description="Company ID")
subscription_tier: str | None = Field(None, description="Subscription level")
custom_permissions: list[str] = Field(default_factory=list)
# Use custom schema
auth = USSOAuth(config, user_schema=CustomUserData)
# Verify token
user = auth.verify_token(token)
print(f"Company: {user.company_id}")
print(f"Tier: {user.subscription_tier}")
Token Caching¶
For high-traffic applications, cache JWKS:
from usso.config import JWTConfig
from cachetools import TTLCache
# Cache JWKS for 1 hour
jwks_cache = TTLCache(maxsize=100, ttl=3600)
config = JWTConfig(
jwks_url="http://localhost:8000/.well-known/jwks.json",
issuer="http://localhost:8000",
audience="your-app",
jwks_cache=jwks_cache
)
Multiple Tenants¶
Support multiple tenants with different configurations:
from fastapi import FastAPI, Depends, Request
from usso.integrations.fastapi import get_authenticator
from usso.config import JWTConfig
app = FastAPI()
# Tenant configurations
TENANT_CONFIGS = {
"tenant1": JWTConfig(
jwks_url="http://tenant1.usso.io/.well-known/jwks.json",
issuer="http://tenant1.usso.io",
audience="app1"
),
"tenant2": JWTConfig(
jwks_url="http://tenant2.usso.io/.well-known/jwks.json",
issuer="http://tenant2.usso.io",
audience="app2"
),
}
def get_tenant_authenticator(request: Request):
"""Get authenticator based on tenant"""
# Extract tenant from subdomain or header
tenant_id = request.headers.get("X-Tenant-ID")
config = TENANT_CONFIGS.get(tenant_id)
if not config:
raise HTTPException(status_code=400, detail="Invalid tenant")
return get_authenticator(config)
@app.get("/data")
def get_data(user: UserData = Depends(get_tenant_authenticator)):
return {"tenant": user.tenant_id, "data": [...]}
Async Token Verification¶
For async applications:
import asyncio
from usso import USSOAuth
auth = USSOAuth(config)
async def verify_async(token: str):
"""Async token verification"""
# Run verification in thread pool
loop = asyncio.get_event_loop()
user = await loop.run_in_executor(None, auth.verify_token, token)
return user
# Usage in FastAPI
@app.get("/async-protected")
async def async_protected():
token = request.headers.get("authorization", "").split(" ")[1]
user = await verify_async(token)
return {"user": user.sub}
Error Handling¶
The SDK raises specific exceptions:
from fastapi import FastAPI, Request, status
from fastapi.responses import JSONResponse
from usso.exceptions import (
USSOException,
TokenExpired,
TokenInvalid,
TokenNotFound,
InsufficientScope
)
app = FastAPI()
@app.exception_handler(TokenExpired)
async def token_expired_handler(request: Request, exc: TokenExpired):
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"error": "Token expired", "detail": str(exc)}
)
@app.exception_handler(TokenInvalid)
async def token_invalid_handler(request: Request, exc: TokenInvalid):
return JSONResponse(
status_code=status.HTTP_401_UNAUTHORIZED,
content={"error": "Invalid token", "detail": str(exc)}
)
@app.exception_handler(InsufficientScope)
async def insufficient_scope_handler(request: Request, exc: InsufficientScope):
return JSONResponse(
status_code=status.HTTP_403_FORBIDDEN,
content={"error": "Insufficient permissions", "detail": str(exc)}
)
@app.exception_handler(USSOException)
async def usso_exception_handler(request: Request, exc: USSOException):
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content={"error": "Authentication error", "detail": str(exc)}
)
Testing¶
Mocking Authentication¶
import pytest
from fastapi.testclient import TestClient
from usso.schemas import UserData
@pytest.fixture
def mock_user():
"""Create mock user for testing"""
return UserData(
sub="user:test123",
tenant_id="org_test",
workspace_id="ws_test",
roles=["editor"],
scopes=["read:posts", "write:posts"],
exp=9999999999 # Far future
)
@pytest.fixture
def authenticated_client(app, mock_user):
"""Create test client with mocked auth"""
def mock_authenticator():
return mock_user
app.dependency_overrides[authenticator] = mock_authenticator
client = TestClient(app)
yield client
app.dependency_overrides.clear()
def test_protected_endpoint(authenticated_client):
"""Test protected endpoint with mocked user"""
response = authenticated_client.get("/protected")
assert response.status_code == 200
assert response.json()["user_id"] == "user:test123"
Integration Testing¶
import pytest
from usso import USSOAuth
from usso.config import JWTConfig
@pytest.fixture
def usso_auth():
"""Create USSO auth instance"""
config = JWTConfig(
jwks_url="http://localhost:8000/.well-known/jwks.json",
issuer="http://localhost:8000",
audience="test-app"
)
return USSOAuth(config)
def test_token_verification(usso_auth):
"""Test actual token verification"""
# Get real token from USSO
token = login_and_get_token()
# Verify token
user = usso_auth.verify_token(token)
assert user.sub.startswith("user:")
assert len(user.roles) > 0
Best Practices¶
1. Use Environment Variables¶
import os
from usso.config import JWTConfig
config = JWTConfig(
jwks_url=os.getenv("USSO_JWKS_URL"),
issuer=os.getenv("USSO_ISSUER"),
audience=os.getenv("USSO_AUDIENCE")
)
2. Cache JWKS¶
from cachetools import TTLCache
jwks_cache = TTLCache(maxsize=100, ttl=3600)
config = JWTConfig(..., jwks_cache=jwks_cache)
3. Validate Scopes Early¶
@app.post("/resource")
def create_resource(
data: dict,
user: UserData = Depends(require_scope("write:resource"))
):
# Scope already validated by dependency
pass
4. Log Authentication Events¶
import logging
logger = logging.getLogger(__name__)
def authenticator_with_logging(user: UserData = Depends(authenticator)):
logger.info(
"User authenticated",
user_id=user.sub,
tenant=user.tenant_id,
workspace=user.workspace_id
)
return user
Troubleshooting¶
JWKS URL Not Accessible¶
# Verify JWKS URL is accessible
import requests
jwks_url = "http://localhost:8000/.well-known/jwks.json"
response = requests.get(jwks_url)
print(response.json())
Token Validation Fails¶
# Debug token validation
from usso import USSOAuth
auth = USSOAuth(config)
try:
user = auth.verify_token(token)
except Exception as e:
print(f"Validation error: {type(e).__name__}: {e}")
# Check token claims
import jwt
claims = jwt.decode(token, options={"verify_signature": False})
print(f"Token claims: {claims}")
Complete Example¶
Here's a complete example application:
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from usso.integrations.fastapi import get_authenticator
from usso.config import JWTConfig
from usso.schemas import UserData
from pydantic import BaseModel
import os
# Configuration
config = JWTConfig(
jwks_url=os.getenv("USSO_JWKS_URL"),
issuer=os.getenv("USSO_ISSUER"),
audience=os.getenv("USSO_AUDIENCE")
)
authenticator = get_authenticator(config)
security = HTTPBearer()
app = FastAPI(title="My API with USSO")
# Models
class Post(BaseModel):
title: str
content: str
# Helper functions
def require_scope(scope: str):
def checker(user: UserData = Depends(authenticator)):
if scope not in user.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Scope '{scope}' required"
)
return user
return checker
# Routes
@app.get("/")
def read_root():
return {"message": "Welcome to My API"}
@app.get("/me")
def get_current_user(user: UserData = Depends(authenticator)):
return {
"user_id": user.sub,
"tenant": user.tenant_id,
"workspace": user.workspace_id,
"roles": user.roles,
"scopes": user.scopes
}
@app.get("/posts")
def list_posts(user: UserData = Depends(require_scope("read:posts"))):
# Filter by workspace
posts = db.get_posts(workspace_id=user.workspace_id)
return {"posts": posts}
@app.post("/posts", status_code=status.HTTP_201_CREATED)
def create_post(
post: Post,
user: UserData = Depends(require_scope("write:posts"))
):
# Create post in user's workspace
new_post = db.create_post(
**post.dict(),
workspace_id=user.workspace_id,
author_id=user.sub
)
return {"post": new_post}
@app.delete("/posts/{post_id}")
def delete_post(
post_id: str,
user: UserData = Depends(require_scope("delete:posts"))
):
db.delete_post(post_id, workspace_id=user.workspace_id)
return {"message": "Post deleted"}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8001)
Next Steps¶
- JavaScript SDK - Frontend integration
- REST API - Direct API usage
- Authorization - Permission management