基于Flask的用户登录注册(token认证)

  • 分类: Python
  • 发表日期:2021-07-30 12:01:00
  • 最后修改:2021-07-30 12:03:00

用户模型

from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from itsdangerous import TimedJSONWebSignatureSerializer as Serializer, BadSignature, SignatureExpired
from flask import current_app

from app import db
from app import redis_conn as redis

class User(db.Model):
    '''用户模型
    '''
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    email = db.Column(db.String(255), unique=True, index=True)
    nickname = db.Column(db.String(255), nullable=True, index=True) 
    password_hash = db.Column(db.String(128))
    join_time = db.Column(db.DateTime, default=datetime.now)
    last_seen = db.Column(db.DateTime, default=datetime.now)

    @property
    def password(self):
        raise AttributeError('密码不可访问')

    @password.setter
    def password(self, password):
        '''生成hash密码'''
        self.password_hash = generate_password_hash(password)

    def verify_password(self, password):
        '''
        密码验证
        :param password-> 用户密码
        :return 验证成功返回True,否则返回False
        '''
        return check_password_hash(self.password_hash, password)

    def generate_user_token(self, expiration):
        '''
        生成确认身份的Token(密令)
        :param expiration-> 有效期限,单位为秒/此处默认设置为12h
        :return 加密过的token
        '''
        s = Serializer(current_app.config['SECRET_KEY'], expires_in=expiration)
        return s.dumps({'id': self.id, 'email': self.email}).decode('utf-8')

    # 解析token,确认登录的用户身份
    @staticmethod
    def verify_user_token(token):
        s = Serializer(current_app.config['SECRET_KEY'])
        try:
            data = s.loads(token)
        except SignatureExpired:
            return None  # valid token, but expired
        except BadSignature:
            return None  # invalid token
        user = User.query.get(data['id'])
        return user

    #额外的功能
    def update_last_seen(self):
        '''更新最后一次登录时间'''
        self.last_seen = datetime.now()
        db.session.add(self)

    def update_rftoken(self, rftoken):
        redis.setex(f'rftoken-{self.email}', current_app.config['RFTOKEN_EXPIRE'], rftoken)

    def update_token(self, token):
        redis.setex(f'token-{self.email}', current_app.config['TOKEN_EXPIRE'], token)

    def get_rftoken(self):
        value = redis.get(f'rftoken-{self.email}')
        if value:
            return str(value, encoding='utf8')
        else:
            return None

    def get_token(self):
        value = redis.get(f'token-{self.email}')
        if value:
            return str(value, encoding='utf8')
        else:
            return None

    def to_json(self):
        '''返回用户信息'''
        return {
            'user_id': self.id,
            'nickname': self.nickname,
            # 'email': self.email,
        }

    def __repr__(self):
        return '<User %r>' % self.nickname

    #TODO: 可以添加用户头像

 

视图函数

import os
from flask import Flask, current_app, make_response, jsonify, request, g
from app import db, redis_conn
from app.api_v1 import api
from app.models import User
from utils.response_code import RET, error_map
from utils import timestamp_to_format


@api.route('/signin', methods=['POST'])
def signin():
    '''用户注册接口
    :return 返回注册信息{'ret_code': '0', 'msg': '注册成功'}
    '''
    # nickname =request.json.get('nickname')
    email =request.form.get('email')
    password =request.form.get('password')
    # mailcode_client =request.json.get('mailcode')

    if not all([email, password]):
        return jsonify(ret_code=RET.PARAMERR, msg='参数不完整')

    #从Redis中获取此邮箱对应的验证码,与前端传来的数据校验
    # try:
    #     mailcode_server = redis_conn.get('EMAILCODE:'+ email).decode()
    # except Exception as e:
    #     current_app.logger.debug(e)
    #     return jsonify(ret_code=RET.DBERR, msg='查询邮箱验证码失败')
    # if mailcode_server != mailcode_client:
    #     current_app.logger.debug(mailcode_server)
    #     return jsonify(ret_code=RET.PARAMERR, msg='邮箱验证码错误')

    user = User()
    user.email = email
    # user.nickname = nickname
    user.password = password    #利用user model中的类属性方法加密用户的密码并存入数据库
    try:
        db.session.add(user)
        db.session.commit()
    except Exception as e:
        current_app.logger.debug(e)
        db.session.rollback()
        return jsonify(ret_code=RET.DBERR, msg='注册失败')
    #6.响应结果
    return jsonify(ret_code=RET.OK, msg='注册成功')

@api.route('/login', methods=['POST'])
def login():
    '''登录
    TODO: 添加图片验证
    :return 返回响应,保持登录状态
    '''
    email = request.form.get('email')
    password = request.form.get('password')

    #解析Authorization
    #email, password = base64.b64decode(request.headers['Authorization'].split(' ')[-1]).decode().split(':')

    if not all([email, password]):
        return jsonify(ret_code=RET.PARAMERR, msg=error_map[RET.PARAMERR])

    try:
        user = User.query.filter(User.email==email).first()

    except Exception as e:
        current_app.logger.debug(e)
        return jsonify(ret_code=RET.DBERR, msg=error_map[RET.DBERR])

    if not user:
        return jsonify(ret_code=RET.USERERR, msg=error_map[RET.USERERR])

    if not user.verify_password(password):
        return jsonify(ret_code=RET.PWDERR, msg=error_map[RET.PWDERR])

    g.user = user
    rftoken = user.generate_user_token(expiration=current_app.config['RFTOKEN_EXPIRE'])
    token = user.generate_user_token(expiration=current_app.config['TOKEN_EXPIRE'])
    #更新最后一次登录时间
    user.update_last_seen()
    # mysql 更新 rftoken
    user.update_rftoken(rftoken)
    # redis 更新 token
    user.update_token(token)

    data = {
        'ret_code': RET.OK,
        'msg': {
            'token': token,
            'rftoken': rftoken
        }
    }
    return jsonify(data)

@api.route('/refresh', methods=['GET'])
def refresh_token():
    token = request.headers.get('Authorization', '')
    if token:
        user = User.verify_user_token(token)

        if not user:
            return jsonify(ret_code=RET.NOTUSER, msg=error_map[RET.NOTUSER])

        g.user = user

        if user.get_rftoken() == token:

            token = user.generate_user_token(expiration=current_app.config['TOKEN_EXPIRE'])
            rftoken = user.generate_user_token(expiration=current_app.config['RFTOKEN_EXPIRE'])
            # mysql 更新 rftoken
            user.update_rftoken(rftoken)
            # redis 更新 token
            user.update_token(token)

            data = {
                'ret_code': RET.OK,
                'msg': {
                    'token': token,
                    'rftoken': rftoken,
                }
            }
            return jsonify(data)
        else:
            return jsonify(ret_code=RET.NOTUSER, msg=error_map[RET.NOTUSER])
    else:
        return jsonify(ret_code=RET.NOTUSER, msg=error_map[RET.NOTUSER])

@api.errorhandler(404)
def page_not_found(error):
    data = {'msg':'404 error'}
    response = make_response(jsonify(data), 404)
    return response

 

post
2021年7月9日 09:40 原创 草稿

针对情报平台的多种 es dsl 测试

post
2021年7月13日 09:36 原创
post
2021年7月30日 12:15 原创
post
2021年7月30日 15:07 原创
post
2021年7月30日 15:13 原创
post
2021年7月30日 15:18 原创
post
2021年7月30日 15:24 原创
post
2021年7月30日 16:09 原创
post
2021年7月30日 16:02 原创
post
2021年8月16日 15:28 原创
post
2021年8月16日 20:01
post
2021年8月17日 12:07 原创
post
2021年8月31日 15:42 原创
post
2021年10月8日 16:17
post
2021年10月13日 11:43
post
2021年10月21日 15:47 原创
post
2021年10月25日 11:27

0 评论

大哥整点话呗~