Django Rest Framework(DRF)带有不同的内置身份验证类,令牌身份验证或JWT
技术栈
- Django
- Django Rest Framework
步骤
用户发送带有用户名和密码的POST请求进行登录,然后服务器将执行3件事
- 生成一个
access_token
寿命很短的jwt(可能5分钟)并将其发送到响应正文中 - 生成一个
refresh_token
寿命很长的jwt(天)并将其发送到httponly cookie中,因此无法从客户端javascript访问 - 发送包含CSRF令牌的普通Cookie
开发人员需要确保所有不安全的视图(POST,UPDATE,PUT,Delete)均受内置的Django CSRF保护,因为如上所述,DRF默认禁用它们
在客户端,开发人员应注意
- 在客户端,每个请求都将在cookie中自动包含刷新令牌(确保在服务器cors标头设置中将您的客户端域列入白名单)
- 发送
access_token
中的Authorization
标头 X-CSRFTOKEN
如果他正在执行POST请求,请在标头中发送CSRF令牌- 当他需要一个新的时
access_token
,他需要发送一个POST请求来刷新令牌端点
准备工作创建虚拟环境
python -m venv venv
pip install django
pip install django-cors-headers # 跨域
pip install djangorestframework
pip install pyjwt
pip install django-simpleui # 后台UI框架 选用
项目设置
# 创建一个Django项目
django-admin startapp project
# 创建一个App
python manage.py startapp accounts
在项目设置中启用应用程序并添加一些设置
# Settings.py
INSTALLED_APPS = [
...
# 插件
'corsheaders',
'rest_framework',
#Apps
'accounts',
]
CORS_ALLOW_CREDENTIALS = True # 通过ajax请求接受cookie
REST_FRAMEWORK = {
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated', # 使所有端口私有
)
}
创建后台用户
为Django身份验证中预定义的User模型编写API,因此我们无需定义新模型,而是可以继续描述序列化程序
# (accounts)models.py
from django.db import models
from django.contrib.auth.models import AbstractUser
class User(AbstractUser):
pass
然后在后台指定用户模型
---
AUTH_USER_MODEL = 'accounts.User' # accounts这个下的User模型
---
稍后您可以通过以下任一方式来引用用户模型
from django.conf import settings
User = settings.AUTH_USER_MODEL
# 或者
from django.contrib.auth import get_user_model
User = get_user_model()
并将accounts.admin
新的用户模型注册到管理站点
# (accounts)admin.py
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from accounts.models import User
admin.site.register(User, UserAdmin)
创建超级用户
makemigrations # 生成迁移模型
migrate
createsuperuser # 创建用户
最后是要测试的端点,正如我们在上述设置中声明的那样,默认情况下,所有端点都将需要身份验证,我们可以在某些视图中覆盖此身份,稍后我们将在登录时进行此操作
我们将创建用户配置文件端点,该端点将以JSON返回当前经过身份验证的用户对象,为此,我们需要创建用户序列化程序
创建用户序列化程序
from rest_framework import serializers
from .models import User # 导入模型
class UserSerializer(serializers.ModelSerializer):
class Meta:
model = User
fields = ['id', 'username', 'email', 'first_name', 'last_name', 'is_active']
# 或者排除掉密码
# exclude = ['password']
urls.py
# 根目录/urls.py
from django.urls import path, include
urlpatterns = [
path('accounts/', include('accounts.urls')), # 引入App/accounts
]
# APP(accounts).url
urlpatterns = [
path('profile', profile, name='profile')
]
views.py
from rest_framework.decorators import api_view
from rest_framework.response import Response
from .serializers import UserSerializer # 导入序列化
'''
获取用户信息视图
'''
@api_view(['GET'])
def profile(request):
user = request.user
serialized_user = UserSerializer(user).data
return Response({'user': serialized_user })
现在,如果您尝试访问此端点,则会收到403错误,如简介中所述.
我们需要登录,然后在请求标头中发送access_token
登录视图
登录端将是带有username
和password
在请求正文中的发布请求。
我们将使用权限类装饰器将登录视图公开AllowAny
,并且@ensure_csrf_cookie
如果登录成功,将强制Django在响应中发送CSRF cookie来装饰视图
如果登录成功,我们将有
- 一个
access_token
在响应体中 - 一个
refreshtoken
在的HttpOnly的cookie - 一个
csrftoken
在正常的cookie,所以我们可以从JavaScript阅读并在需要时重新发送
from django.shortcuts import render
from rest_framework.views import APIView
from rest_framework.response import Response
from .serializers import *
from django.contrib.auth import get_user_model
from rest_framework import exceptions
from rest_framework.permissions import AllowAny
from django.views.decorators.csrf import ensure_csrf_cookie
from .utils import *
from .authentications import JWTAuthentication
'''
获取用户信息
'''
class Profile(APIView):
authentication_classes = [JWTAuthentication]
def get(self, request):
user = request.user
serialized_user = UserSerializer(user).data
return Response({'user': serialized_user})
'''
登录视图
'''
class Login(APIView):
permission_classes = [AllowAny]
authentication_classes = [JWTAuthentication]
def get(self, request):
return render(request, 'login.html')
def post(self, request):
User = get_user_model()
username = request.data.get('username')
password = request.data.get('password')
print(username, password)
response = Response()
if (username is None) or (password is None):
raise exceptions.AuthenticationFailed('用户名或密码为空')
user = User.objects.filter(username=username).first()
if (user is None):
raise exceptions.AuthenticationFailed('用户未找到')
if (not user.check_password(password)):
raise exceptions.AuthenticationFailed('密码错误')
serialized_user = UserSerializer(user).data
access_token = generate_access_token(user)
refresh_token = generate_refresh_token(user)
print(access_token)
print(refresh_token)
response.set_cookie(key='refreshtoken', value=refresh_token, httponly=True)
response.data = {
'access_token': access_token,
'user': serialized_user
}
return response
标记
也可以用drf内置的获取用户名与密码。
from rest_framework.authtoken.serializers import AuthTokenSerializer
def post(self,request):
serializer = AuthTokenSerializer(data=request.data) # 序列化
if serializer.is_valid():
user = serializer.validated_data.get('user')
user.last_login = now() # 记录用户最后登录时间6
user.save()
token = generate_jwt(user)
user_serializer = UserSerializer(user)
return Response({"token":token,"user":user_serializer.data})
这是生成令牌的函数
import datetime
import jwt
from django.conf import settings
'''
生成登录成功Token
'''
def generate_access_token(user):
access_token_payload = {
'user_id': user.id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, minutes=5),#5分
'iat': datetime.datetime.utcnow(),
}
access_token = jwt.encode(access_token_payload,
settings.SECRET_KEY, algorithm='HS256').decode('utf-8')
return access_token
def generate_refresh_token(user):
refresh_token_payload = {
'user_id': user.id,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7),# 7天
'iat': datetime.datetime.utcnow()
}
refresh_token = jwt.encode(
refresh_token_payload, settings.SECRET_KEY, algorithm='HS256').decode('utf-8')
return refresh_token
DRF的自定义身份验证类
Django Rest Framework使创建自定义身份验证方案变得容易,它在https://www.django-rest-framework.org/api-guide/authentication/#custom-authentication中进行了详细描述
先看一下drf的源代码
from rest_framework.authentication import TokenAuthentication,
先复制下来然后稍加修改
创建一个authentications.py
from rest_framework.authentication import BaseAuthentication, TokenAuthentication, get_authorization_header
from rest_framework import exceptions
import jwt
from django.conf import settings
from django.contrib.auth import get_user_model
User = get_user_model()
# 1. 继承BaseAuthentication
class JWTAuthentication(BaseAuthentication):
'''
自定义头
Authorization: JWT 401f7ac837da42b97f613d789819ff93537bee6a
'''
keyword = 'JWT'
model = None
def authenticate(self, request):
# 拿到jwt值
auth = get_authorization_header(request).split()
if not auth or auth[0].lower() != self.keyword.lower().encode():
return None
if len(auth) == 1:
msg = '认证失败'
raise exceptions.AuthenticationFailed(msg)
elif len(auth) > 2:
msg = '应该提供一个空格'
raise exceptions.AuthenticationFailed(msg)
try:
jwt_token = auth[1]
# 解码
jwt_info = jwt.decode(jwt_token, settings.SECRET_KEY, algorithms=['HS256'])
# 提取用户ID 对应加密的user_id
user_id = jwt_info.get('user_id')
try:
user = User.objects.get(pk=user_id)
return (user, None)
except:
msg = '用户名不存在'
raise exceptions.AuthenticationFailed(msg)
except jwt.ExpiredSignatureError:
msg = 'Token 过期'
raise exceptions.AuthenticationFailed(msg)
创建该类后,然后再去settings.py
中更改全局配置
REST_FRAMEWORK = {
'DEFAULT_AUTHENTICATION_CLASSES': (
'accounts.authentications.JWTAuthentication',# APP.文件名.类名
),
'DEFAULT_PERMISSION_CLASSES': (
'rest_framework.permissions.IsAuthenticated', # 使所有端口私有
),
}
重新访问试试。 这次我们将设置Authorization
标头
刷新令牌视图
每当令牌过期或出于任何原因需要新令牌时,我们都需要refresh_token
。
该视图需要获得的许可,AlloAny
,access_token
但是它将受到其他2件事的保护
refresh_token
httoponly cookie中发送的有效信息- CSRF令牌,因此我们确保如果满足两个条件,上述cookie不会受到损害,然后服务器将生成一个新的有效值
access_token
并将其发送回去
如果refresh_token
无效或过期,则用户需要重新登录
还记得刚才登录时候保存的refreshtoken吗
import jwt
class RefreshTokenView(APIView):
permission_classes = [AllowAny]
def post(self, request):
'''
To obtain a new access_token this view expects 2 important things:
1. a cookie that contains a valid refresh_token
2. a header 'X-CSRFTOKEN' with a valid csrf token, client app can get it from cookies "csrftoken"
'''
User = get_user_model()
refresh_token = request.COOKIES.get('refreshtoken')
if refresh_token is None:
raise exceptions.AuthenticationFailed('没有提供身份验证凭据')
try:
payload = jwt.decode(refresh_token, settings.SECRET_KEY, algorithms=['HS256'])
except jwt.ExpiredSignatureError:
raise exceptions.AuthenticationFailed('过期的刷新令牌,请重新登录')
user = User.objects.filter(id = payload.get('user_id')).first()
if user is None:
raise exceptions.AuthenticationFailed('用户未找到')
access_token = generate_access_token(user)
return Response({'access_token': access_token})
参考
[Site](