您现在的位置是:首页>文章详情

Flask + PyJWT 实现基于Json Web Token的用户认证授权

泛逸舟

在程序开发中,用户认证授权是一个绕不过的重难点。以前的开发模式下,cookie和session认证是主流,随着前后端分离的趋势,基于Token的认证方式成为主流,而JWT是基于Token认证方式的一种机制,是实现单点登录认证的一种有效方法。

PyJWT是一个用来编码和解码JWT(JSON Web Tokens)的Python库,也可以用在Flask上。本文就通过一个实例来演示Flask项目整合PyJWT来实现基于Token的用户认证授权。

 

一、需求

1、程序将实现一个用户注册、登录和获取用户信息的功能

2、用户注册时输入用户名(username)、邮箱(email)和密码(password),用户名和邮箱是唯一的,如果数据库中已有则会注册失败;用户注册成功后返回用户的信息。

3、用户使用用户名(username)和密码(password)登录,登录成功时返回token,每次登录都会更新一次token。

4、用户要获取用户信息,需要在请求Header中传入验证参数和token,程序会验证这个token的有效性并给出响应。

5、程序构建方面,将用户和认证分列两个模块。

 

二、程序目录结构

根据示例需求构建程序目录结构:

 

三、程序实现

1、程序构建及相关文件

数据迁移配置文件:

 
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
from run import app
from app import db
 
app.config.from_object('app.config')
 
db.init_app(app)
 
migrate = Migrate(app, db)
manager = Manager(app)
manager.add_command('db', MigrateCommand)
 
if __name__ == '__main__':
manager.run()
 
 
 
flask-pyjwt-auth/db.py

运行入口文件:

 
 
from app import create_app
 
app = create_app('app.config')
 
if __name__ == '__main__':
app.run(host=app.config['HOST'],
port=app.config['PORT'],
debug=app.config['DEBUG'])
 
 
 
flask-pyjwt-auth/run.py

程序初始化文件:

 
 
from flask import Flask, request
from flask_sqlalchemy import SQLAlchemy
 
db = SQLAlchemy()
 
def create_app(config_filename):
app = Flask(__name__)
app.config.from_object(config_filename)
 
@app.after_request
def after_request(response):
response.headers.add('Access-Control-Allow-Origin', '*')
if request.method == 'OPTIONS':
response.headers['Access-Control-Allow-Methods'] = 'DELETE, GET, POST, PUT'
headers = request.headers.get('Access-Control-Request-Headers')
if headers:
response.headers['Access-Control-Allow-Headers'] = headers
return response
 
from app.users.model import db
db.init_app(app)
 
from app.users.api import init_api
init_api(app)
 
return app
 
 
 
flask-pyjwt-auth/app/__init__.py

上面代码加入了全局HTTP请求头配置,设置允许所有跨域请求。

配置文件:

 
 
DB_USER = 'root'
DB_PASSWORD = ''
DB_HOST = 'localhost'
DB_DB = 'flask-pyjwt-auth'
 
DEBUG = True
PORT = 3333
HOST = "192.168.1.141"
SECRET_KEY = "my blog"
 
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_DATABASE_URI = 'mysql://' + DB_USER + ':' + DB_PASSWORD + '@' + DB_HOST + '/' + DB_DB
 
 
 
flask-pyjwt-auth/app/config.py

公共文件:

 
def trueReturn(data, msg):
return {
"status": True,
"data": data,
"msg": msg
}
 
 
def falseReturn(data, msg):
return {
"status": False,
"data": data,
"msg": msg
}
 
 
 
 
flask-pyjwt-auth/app/common.py

2、用户模块

模块入口(空)

 
flask-pyjwt-auth/app/users/__init__.py

用户模型: 

 
from flask_sqlalchemy import SQLAlchemy
from sqlalchemy.exc import SQLAlchemyError
from werkzeug.security import generate_password_hash, check_password_hash
 
from app import db
 
class Users(db.Model):
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(250), unique=True, nullable=False)
username = db.Column(db.String(250), unique=True, nullable=False)
password = db.Column(db.String(250))
login_time = db.Column(db.Integer)
 
def __init__(self, username, password, email):
self.username = username
self.password = password
self.email = email
 
def __str__(self):
return "Users(id='%s')" % self.id
 
def set_password(self, password):
return generate_password_hash(password)
 
def check_password(self, hash, password):
return check_password_hash(hash, password)
 
def get(self, id):
return self.query.filter_by(id=id).first()
 
def add(self, user):
db.session.add(user)
return session_commit()
 
def update(self):
return session_commit()
 
def delete(self, id):
self.query.filter_by(id=id).delete()
return session_commit()
 
 
def session_commit():
try:
db.session.commit()
except SQLAlchemyError as e:
db.session.rollback()
reason = str(e)
return reason
 
 
 
flask-pyjwt-auth/app/users/model.py

在上面用户模型定义中,定义了set_password和check_password方法,分别用来加密用户注册时填写的密码(将加密后的密码写入数据库)和在用户登录时检查用户密码是否正确。

用户相关接口实现:

 
 
from flask import jsonify, request
from app.users.model import Users
from app.auth.auths import Auth
from .. import common
 
def init_api(app):
@app.route('/register', methods=['POST'])
def register():
"""
:return: json
"""
email = request.form.get('email')
username = request.form.get('username')
password = request.form.get('password')
user = Users(email=email, username=username, password=Users.set_password(Users, password))
result = Users.add(Users, user)
if user.id:
returnUser = {
'id': user.id,
'username': user.username,
'email': user.email,
'login_time': user.login_time
}
return jsonify(common.trueReturn(returnUser, ""))
else:
return jsonify(common.falseReturn('', ''))
 
 
@app.route('/login', methods=['POST'])
def login():
"""
:return: json
"""
username = request.form.get('username')
password = request.form.get('password')
if (not username or not password):
return jsonify(<span class="ace_identifi

评论

评论插件