Javascript is required
DRF自定义JWT身份验证

Django Rest Framework(DRF)带有不同的内置身份验证类,令牌身份验证或JWT

技术栈

  • Django
  • Django Rest Framework

步骤

用户发送带有用户名和密码的POST请求进行登录,然后服务器将执行3件事

  • 生成一个access_token寿命很短的jwt(可能5分钟)并将其发送到响应正文中
  • 生成一个refresh_token寿命很长的jwt(天)并将其发送到httponly cookie中,因此无法从客户端javascript访问
  • 发送包含CSRF令牌的普通Cookie

开发人员需要确保所有不安全的视图(POST,UPDATE,PUT,Delete)均受内置的Django CSRF保护,因为如上所述,DRF默认禁用它们

在客户端,开发人员应注意

  • 在客户端,每个请求都将在cookie中自动包含刷新令牌(确保在服务器cors标头设置中将您的客户端域列入白名单)
  • 发送access_token中的Authorization标头
  • X-CSRFTOKEN如果他正在执行POST请求,请在标头中发送CSRF令牌
  • 当他需要一个新的时access_token,他需要发送一个POST请求来刷新令牌端点

准备工作创建虚拟环境

python -m venv venv
pip install django
pip install django-cors-headers # 跨域
pip install djangorestframework 
pip install pyjwt
pip install django-simpleui # 后台UI框架 选用

项目设置

# 创建一个Django项目
django-admin startapp project

# 创建一个App
python manage.py startapp accounts

在项目设置中启用应用程序并添加一些设置

# Settings.py
INSTALLED_APPS = [
    ...
    # 插件
    'corsheaders',
    'rest_framework',

    #Apps
    'accounts',
]

CORS_ALLOW_CREDENTIALS = True  # 通过ajax请求接受cookie

REST_FRAMEWORK = {
    'DEFAULT_PERMISSION_CLASSES': (
        'rest_framework.permissions.IsAuthenticated', # 使所有端口私有
    )
}

创建后台用户

为Django身份验证中预定义的User模型编写API,因此我们无需定义新模型,而是可以继续描述序列化程序

# (accounts)models.py
from django.db import models

from django.contrib.auth.models import AbstractUser

class User(AbstractUser):
    pass

然后在后台指定用户模型

---
AUTH_USER_MODEL = 'accounts.User' # accounts这个下的User模型
---

稍后您可以通过以下任一方式来引用用户模型

from django.conf import settings
User = settings.AUTH_USER_MODEL

# 或者

from django.contrib.auth import get_user_model
User = get_user_model()

并将accounts.admin新的用户模型注册到管理站点

# (accounts)admin.py
from django.contrib import admin
from django.contrib.auth.admin import UserAdmin
from accounts.models import User

admin.site.register(User, UserAdmin)

创建超级用户

makemigrations # 生成迁移模型

migrate

createsuperuser # 创建用户

最后是要测试的端点,正如我们在上述设置中声明的那样,默认情况下,所有端点都将需要身份验证,我们可以在某些视图中覆盖此身份,稍后我们将在登录时进行此操作


我们将创建用户配置文件端点,该端点将以JSON返回当前经过身份验证的用户对象,为此,我们需要创建用户序列化程序

创建用户序列化程序

from rest_framework import serializers
from .models import User # 导入模型

class UserSerializer(serializers.ModelSerializer):
    class Meta:
        model = User
        fields = ['id', 'username', 'email', 'first_name', 'last_name', 'is_active']
        # 或者排除掉密码
        # exclude = ['password']

urls.py

# 根目录/urls.py
from django.urls import path, include

urlpatterns = [
    path('accounts/', include('accounts.urls')), # 引入App/accounts
]


# APP(accounts).url
urlpatterns = [
    path('profile', profile, name='profile')
]

views.py

from rest_framework.decorators import api_view
from rest_framework.response import Response
from .serializers import UserSerializer # 导入序列化


'''
    获取用户信息视图
'''
@api_view(['GET'])
def profile(request):
    user = request.user
    serialized_user = UserSerializer(user).data
    return Response({'user': serialized_user })

现在,如果您尝试访问此端点,则会收到403错误,如简介中所述.

我们需要登录,然后在请求标头中发送access_token

登录视图

登录端将是带有usernamepassword在请求正文中的发布请求。

我们将使用权限类装饰器将登录视图公开AllowAny,并且@ensure_csrf_cookie如果登录成功,将强制Django在响应中发送CSRF cookie来装饰视图

如果登录成功,我们将有

  • 一个access_token在响应体中
  • 一个refreshtoken在的HttpOnly的cookie
  • 一个csrftoken在正常的cookie,所以我们可以从JavaScript阅读并在需要时重新发送
from django.shortcuts import render
from rest_framework.views import  APIView
from rest_framework.response import Response
from .serializers import *

from django.contrib.auth import get_user_model
from rest_framework import exceptions
from rest_framework.permissions import AllowAny
from django.views.decorators.csrf import  ensure_csrf_cookie
from .utils import *
from .authentications import JWTAuthentication


'''
    获取用户信息
'''
class Profile(APIView):
    authentication_classes = [JWTAuthentication]
    def get(self, request):
        user = request.user
        serialized_user = UserSerializer(user).data
        return Response({'user': serialized_user})
'''
    登录视图
'''
class Login(APIView):
    permission_classes = [AllowAny]
    authentication_classes = [JWTAuthentication]

    def get(self, request):
        return render(request, 'login.html')

    def post(self, request):
        User = get_user_model()
        username = request.data.get('username')
        password = request.data.get('password')
        print(username, password)

        response = Response()
        if (username is None) or (password is None):
            raise exceptions.AuthenticationFailed('用户名或密码为空')

        user = User.objects.filter(username=username).first()

        if (user is None):
            raise exceptions.AuthenticationFailed('用户未找到')
        if (not user.check_password(password)):
            raise exceptions.AuthenticationFailed('密码错误')

        serialized_user = UserSerializer(user).data

        access_token = generate_access_token(user)
        refresh_token = generate_refresh_token(user)
        print(access_token)
        print(refresh_token)

        response.set_cookie(key='refreshtoken', value=refresh_token, httponly=True)

        response.data = {
            'access_token': access_token,
            'user': serialized_user
        }

        return response

标记

也可以用drf内置的获取用户名与密码。

from rest_framework.authtoken.serializers import AuthTokenSerializer

def post(self,request):
    serializer = AuthTokenSerializer(data=request.data) # 序列化
    if serializer.is_valid():
        user = serializer.validated_data.get('user')
        user.last_login = now() # 记录用户最后登录时间6
        user.save()
        token = generate_jwt(user)
        user_serializer = UserSerializer(user)
        return Response({"token":token,"user":user_serializer.data})

这是生成令牌的函数

import datetime
import jwt
from django.conf import settings

'''
    生成登录成功Token
'''
def generate_access_token(user):

    access_token_payload = {
        'user_id': user.id,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(days=0, minutes=5),#5分
        'iat': datetime.datetime.utcnow(),
    }
    access_token = jwt.encode(access_token_payload,
                              settings.SECRET_KEY, algorithm='HS256').decode('utf-8')
    return access_token


def generate_refresh_token(user):
    refresh_token_payload = {
        'user_id': user.id,
        'exp': datetime.datetime.utcnow() + datetime.timedelta(days=7),# 7天
        'iat': datetime.datetime.utcnow()
    }
    refresh_token = jwt.encode(
        refresh_token_payload, settings.SECRET_KEY, algorithm='HS256').decode('utf-8')

    return refresh_token


DRF的自定义身份验证类

Django Rest Framework使创建自定义身份验证方案变得容易,它在https://www.django-rest-framework.org/api-guide/authentication/#custom-authentication中进行了详细描述

先看一下drf的源代码

from rest_framework.authentication import TokenAuthentication,

先复制下来然后稍加修改

创建一个authentications.py

from rest_framework.authentication import BaseAuthentication, TokenAuthentication, get_authorization_header
from rest_framework import exceptions
import jwt
from django.conf import settings
from django.contrib.auth import get_user_model

User = get_user_model()

# 1. 继承BaseAuthentication
class JWTAuthentication(BaseAuthentication):
    '''
    自定义头
    Authorization: JWT 401f7ac837da42b97f613d789819ff93537bee6a
    '''
    keyword = 'JWT'
    model = None


    def authenticate(self, request):
        # 拿到jwt值
        auth = get_authorization_header(request).split()
        

        if not auth or auth[0].lower() != self.keyword.lower().encode():
            return None

        if len(auth) == 1:
            msg = '认证失败'
            raise exceptions.AuthenticationFailed(msg)
        elif len(auth) > 2:
            msg = '应该提供一个空格'
            raise exceptions.AuthenticationFailed(msg)

        try:
            jwt_token = auth[1]
            # 解码
            jwt_info = jwt.decode(jwt_token, settings.SECRET_KEY, algorithms=['HS256'])
            # 提取用户ID 对应加密的user_id
            user_id = jwt_info.get('user_id')
            try:
                user = User.objects.get(pk=user_id)
                return (user, None)
            except:
                msg = '用户名不存在'
                raise exceptions.AuthenticationFailed(msg)

        except jwt.ExpiredSignatureError:
            msg = 'Token 过期'
            raise exceptions.AuthenticationFailed(msg)

创建该类后,然后再去settings.py中更改全局配置

REST_FRAMEWORK = {
    'DEFAULT_AUTHENTICATION_CLASSES': (
        'accounts.authentications.JWTAuthentication',# APP.文件名.类名
    ),
    'DEFAULT_PERMISSION_CLASSES': (
        'rest_framework.permissions.IsAuthenticated', # 使所有端口私有
    ),
}

重新访问试试。 这次我们将设置Authorization标头

刷新令牌视图

每当令牌过期或出于任何原因需要新令牌时,我们都需要refresh_token

该视图需要获得的许可,AlloAnyaccess_token但是它将受到其他2件事的保护

  • refresh_tokenhttoponly cookie中发送的有效信息
  • CSRF令牌,因此我们确保如果满足两个条件,上述cookie不会受到损害,然后服务器将生成一个新的有效值access_token并将其发送回去

如果refresh_token无效或过期,则用户需要重新登录

还记得刚才登录时候保存的refreshtoken吗

import jwt
class RefreshTokenView(APIView):
    permission_classes = [AllowAny]

    def post(self, request):
        '''
         To obtain a new access_token this view expects 2 important things:
        1. a cookie that contains a valid refresh_token
        2. a header 'X-CSRFTOKEN' with a valid csrf token, client app can get it from cookies "csrftoken"
        '''
        User = get_user_model()
        refresh_token = request.COOKIES.get('refreshtoken')
        if refresh_token is None:
            raise  exceptions.AuthenticationFailed('没有提供身份验证凭据')

        try:
            payload = jwt.decode(refresh_token, settings.SECRET_KEY, algorithms=['HS256'])
        except jwt.ExpiredSignatureError:
            raise exceptions.AuthenticationFailed('过期的刷新令牌,请重新登录')

        user = User.objects.filter(id = payload.get('user_id')).first()
        if user is None:
            raise exceptions.AuthenticationFailed('用户未找到')

        access_token = generate_access_token(user)
        return Response({'access_token': access_token})

参考

[Site](