Flask 实现Token认证机制

这篇具有很好参考价值的文章主要介绍了Flask 实现Token认证机制。希望对大家有所帮助。如果存在错误或未考虑完全的地方,请大家不吝赐教,您也可以点击"举报违法"按钮提交疑问。

在Flask框架中,实现Token认证机制并不是一件复杂的事情。除了使用官方提供的flask_httpauth模块或者第三方模块flask-jwt,我们还可以考虑自己实现一个简易版的Token认证工具。自定义Token认证机制的本质是生成一个令牌(Token),并在用户每次请求时验证这个令牌的有效性。

整个过程可以分为以下几个步骤:

  1. 用户登录时生成Token,并将Token与用户关联存储在服务器端。
  2. 用户在请求时携带Token。
  3. 服务器在收到请求后,验证Token的有效性。
  4. 如果Token有效,允许用户访问相应资源;否则,拒绝访问。

这种自定义的Token认证机制相对简单,适用于一些小型应用或者对于Token认证机制有特殊需求的场景。搭建这样一个简易的认证系统有助于理解Token认证的基本原理,并可以根据实际需求进行灵活的定制。

创建表结构

通过表结构的创建,建立用户认证和会话管理表。UserAuthDB表存储了用户的账号密码信息,而SessionAuthDB表则存储了用户登录后生成的Token信息,包括用户名、Token本身以及Token的过期时间。这为后续实现用户注册、登录以及Token认证等功能提供了数据库支持。

UserAuthDB表:

  • 用途:存储用户账号密码信息。
  • 字段:
    • id: 主键,自增,唯一标识每个用户。
    • username: 用户名,非空,唯一,用于登录时识别用户。
    • password: 密码,非空,用于验证用户身份。

SessionAuthDB表:

  • 用途:存储登录成功后用户的Token信息。
  • 字段:
    • id: 主键,自增,唯一标识每个登录会话。
    • username: 用户名,非空,唯一,关联到UserAuthDB表的用户名。
    • token: 用户登录后生成的Token,非空,唯一,用于身份验证。
    • invalid_date: Token的过期时间,用于判断Token是否过期。

代码通过Flask路由/create实现了数据库表结构的创建,主要包括两张表,分别是UserAuthDBSessionAuthDB

@app.route("/create",methods=["GET"])
def create():
    conn = sqlite3.connect("./database.db")
    cursor = conn.cursor()
    create_auth = "create table UserAuthDB(" \
             "id INTEGER primary key AUTOINCREMENT not null unique," \
             "username varchar(64) not null unique," \
             "password varchar(64) not null" \
             ")"
    cursor.execute(create_auth)

    create_session = "create table SessionAuthDB(" \
                     "id INTEGER primary key AUTOINCREMENT not null unique," \
                     "username varchar(64) not null unique," \
                     "token varchar(128) not null unique," \
                     "invalid_date int not null" \
                     ")"

    cursor.execute(create_session)
    conn.commit()
    cursor.close()
    conn.close()
    return "create success"

验证函数

该验证函数用于保证传入的用户名和密码满足一定的安全性和格式要求。通过对长度和字符内容的检查,确保了传入的参数不会导致潜在的安全问题。这样的验证机制在用户注册、登录等场景中可以有效地防止一些常见的安全漏洞。

参数验证:

  • 接受不定数量的参数*kwargs,可传入多个参数。
  • 对于每个传入的参数,首先验证其长度是否在合法范围内(小于128个字符且不为空)。

字符串处理:

  • 将参数转换为小写形式,然后去除两侧空格,并移除所有空格。

字符内容验证:

  • 遍历处理后的字符串,检查其中的字符是否仅包含大写字母、小写字母和数字。如果出现其他字符,则认为非法。

返回结果:

  • 如果所有参数验证通过,即长度合法且字符内容符合要求,则返回True,表示参数合法。
  • 如果有任何一个参数不合法,则返回False,表示参数存在非法字符或超出长度限制。

代码定义了一个名为CheckParameters的验证函数,该函数用于验证传入的参数是否合法。主要验证的对象是用户名和密码,具体概述如下:

def CheckParameters(*kwargs):
    for item in range(len(kwargs)):
        # 先验证长度
        if len(kwargs[item]) >= 128 or len(kwargs[item]) == 0:
            return False

        # 先小写,然后去掉两侧空格,去掉所有空格
        local_string = kwargs[item].lower().strip().replace(" ","")

        # 判断是否只包含 大写 小写 数字
        for kw in local_string:
            if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
                return False
    return True

登录认证函数

该函数实现了用户登录认证的核心逻辑。首先对输入的用户名和密码进行验证,然后检查用户是否存在以及是否已经有生成的Token。如果用户存在但Token不存在,生成一个新的Token并存入数据库,最终返回生成的Token。

路由定义:

  • 使用@app.route("/login", methods=["POST"])定义了一个POST请求的路由,用于处理用户登录请求。

参数获取:

  • 通过request.form.to_dict()获取POST请求中的参数,包括用户名(username)和密码(password)。

参数验证:

  • 调用之前定义的CheckParameters函数对获取的用户名和密码进行合法性验证,确保其符合安全性和格式要求。

用户存在性验证:

  • 调用RunSqlite函数查询UserAuthDB表,验证用户名和密码是否匹配。如果存在匹配的用户,则继续执行下一步。

生成Token:

  • 查询SessionAuthDB表,检查是否存在该用户的Token记录。如果存在,则直接返回该Token。
  • 如果不存在Token记录,则生成一个32位的随机Token,并设置过期时间为当前时间戳加上360秒(6分钟)。

Token写入数据库:

  • 将生成的Token和过期时间写入SessionAuthDB表。

返回结果:

  • 返回生成的Token,作为登录成功的标识。
@app.route("/login",methods=["POST"])
def login():
    if request.method == "POST":
        # 获取参数信息
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 2:
            username = obtain_dict["username"]
            password = obtain_dict["password"]

            # 验证是否合法
            is_true = CheckParameters(username,password)
            if is_true == True:

                # 查询是否存在该用户
                select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
                if select[0][0] == username and select[0][1] == password:
                    # 查询Session列表是否存在
                    select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
                    if select_session != []:
                        ref = {"message": ""}
                        ref["message"] = select_session[0][0]
                        return json.dumps(ref, ensure_ascii=False)

                    # Session不存在则需要重新生成
                    else:
                        # 生成并写入token和过期时间戳
                        token = ''.join(random.sample(string.ascii_letters + string.digits, 32))

                        # 设置360秒周期,过期时间
                        time_stamp = int(time.time()) + 360

                        insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
                        if insert == True:
                            ref = {"message": ""}
                            ref["message"] = token
                            return json.dumps(ref, ensure_ascii=False)
                else:
                    return json.dumps("{'message': '用户名或密码错误'}", ensure_ascii=False)
            else:
                return json.dumps("{'message': '输入参数不可用'}", ensure_ascii=False)

    return json.dumps("{'message': '未知错误'}", ensure_ascii=False)

登录认证装饰器

检查用户登录状态Token是否过期的装饰器,装饰器用于装饰某一些函数,当主调函数被调用时,会优先执行装饰器内的代码,执行后根据装饰器执行结果返回或退出,装饰器分为两种模式,一种是FBV模式,另一种是CBV模式。

FBV(Function-Based Views)和CBV(Class-Based Views)是两种不同的视图设计模式,用于处理Web框架中的请求和生成响应。这两种模式在Django框架中被广泛使用。

FBV(Function-Based Views)

  • 定义: FBV是指使用普通的Python函数来处理请求和生成响应的视图设计模式。
  • 特点:
    • 每个视图对应一个函数,函数接收请求作为参数,返回响应。
    • 简单,易于理解和使用。
    • 视图的逻辑和处理集中在一个函数中。

示例:

def my_view(request):
    # 处理逻辑
    return HttpResponse("Hello, World!")

CBV(Class-Based Views)

  • 定义: CBV是指使用基于类的Python类来处理请求和生成响应的视图设计模式。
  • 特点:
    • 视图是类,每个类中可以包含多个方法来处理不同HTTP方法(GET、POST等)的请求。
    • 提供了更多的代码组织和复用的可能性,可以使用类的继承、Mixin等方式。
    • 更灵活,适用于复杂的业务逻辑和共享逻辑。

示例:

class MyView(View):
    def get(self, request):
        # 处理 GET 请求的逻辑
        return HttpResponse("Hello, World!")

    def post(self, request):
        # 处理 POST 请求的逻辑
        return HttpResponse("Received a POST request")

FBV与CBV区别

  1. 结构差异: FBV使用函数,逻辑较为集中;CBV使用类,允许通过类的继承和Mixin等方式更好地组织代码。
  2. 代码复用: CBV更容易实现代码复用,可以通过继承和Mixin在不同的类之间共享逻辑;而FBV需要显式地将共享逻辑提取为函数。
  3. 装饰器: 在FBV中,使用装饰器来添加额外的功能;而在CBV中,通过类的继承和Mixin来实现相似的功能。
  4. 可读性: 对于简单的视图逻辑,FBV可能更直观易懂;对于较为复杂的业务逻辑,CBV提供了更好的组织和扩展性。

在Flask中,两种设计模式都可以使用,开发者可以根据项目的需求和个人喜好选择使用FBV或CBV。

基于FBV的装饰器设置使用时,需要注意装饰器嵌入的位置,装饰器需要在请求进入路由之前,即在请求未走原逻辑代码的时候介入,对原业务逻辑进行业务拓展。

from flask import Flask, request,render_template
from functools import wraps

app = Flask(__name__)

def login(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("登录请求: {}".format(request.url))

        value = request.form.get("value")
        if value == "lyshark":
            # 调用原函数,并返回
            function_ptr = func(*args, **kwargs)
            return function_ptr
        else:
            return "登录失败"
    return wrapper

@app.route('/', methods=['GET', 'POST'])
@login
def index():
    if request.method == "POST":
        value = request.form.get("value")
    return "index"

if __name__ == '__main__':
    app.run()

而基于CBV的装饰器设置,使用就显得更加细分化,可以定制管理专属功能,在外部定义装饰器可以全局使用,内部定义可以针对特定路由函数特殊处理。

from flask import Flask, request,render_template,views
from functools import wraps

app = Flask(__name__)

# 装饰器
def login(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("登录请求: {}".format(request.url))

        value = request.form.get("value")
        if value == "lyshark":
            # 调用原函数,并返回
            function_ptr = func(*args, **kwargs)
            return function_ptr
        else:
            return "登录失败"
    return wrapper

# 类视图
class index(views.MethodView):
    @login
    def get(self):
        return request.args

    @login
    def post(self):
        return "success"

# 增加路由
app.add_url_rule(rule='/', view_func=index.as_view('index'))

if __name__ == '__main__':
    app.run()

此处为了实现起来更简单一些此处直接使用FBV模式,我们实现的login_check装饰器通过FVB模式构建,代码中取得用户的Token以及用户名对用户身份进行验证。

def login_check(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("处理登录逻辑部分: {}".format(request.url))

        # 得到token 验证是否登陆了,且token没有过期
        local_timestamp = int(time.time())
        get_token = request.headers.get("token")

        # 验证传入参数是否合法
        if CheckParameters(get_token) == True:
            select = RunSqlite("database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
            print(select)
            # 判断是否存在记录,如果存在,在判断时间戳是否合理
            if select != []:
                # 如果当前时间与数据库比对,大于说明过期了需要删除原来的,让用户重新登录
                if local_timestamp >= int(select[0][1]):
                    print("时间戳过期了")
                    # 删除原来的Token
                    delete = RunSqlite("database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
                    if delete == True:
                        return json.dumps("{'token': 'Token 已过期,请重新登录获取'}", ensure_ascii=False)
                    else:
                        return json.dumps("{'token': '数据库删除异常,请联系开发者'}", ensure_ascii=False)
                else:
                    # 验证Token是否一致
                    if select[0][0] == get_token:
                        print("Token验证正常,继续执行function_ptr指向代码.")
                        # 返回到原函数
                        return func(*args, **kwargs)
                    else:
                        print("Token验证错误 {}".format(select))
                        return json.dumps("{'token': 'Token 传入错误'}", ensure_ascii=False)

            # 装饰器调用原函数
            # function_ptr = func(*args, **kwargs)

        return json.dumps("{'token': 'Token 验证失败'}", ensure_ascii=False)
    return wrapper

调用演示

主调用函数则是具体的功能实现可以自定义扩展,当用户访问该路由时会优先调用login_check装饰器来验证用户携带Token的合法性,如果合法则会通过return func(*args, **kwargs)返回执行主调函数,否则直接返回验证失败的消息。

# 获取参数函数
@app.route("/GetPage", methods=["POST"])
@login_check
def GetPage():
    if request.method == "POST":
        # 获取参数信息
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 1:

            pagename = obtain_dict["pagename"]
            print("查询名称: {}".format(obtain_dict["pagename"]))

            # 相应头的完整写法
            req = Response(response="ok", status=200, mimetype="application/json")
            req.headers["Content-Type"] = "text/json; charset=utf-8"
            req.headers["Server"] = "LyShark Server 1.0"
            req.data = json.dumps("{'message': 'hello world'}")
            return req
        else:
            return json.dumps("{'message': '传入参数错误,请携带正确参数请求'}", ensure_ascii=False)
    return json.dumps("{'token': '未知错误'}", ensure_ascii=False)

# 用户注册函数
@app.route("/register", methods=["POST"])
def Register():
    if request.method == "POST":
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 2:

            print("用户名: {} 密码: {}".format(obtain_dict["username"], obtain_dict["password"]))
            reg_username = obtain_dict["username"]
            reg_password = obtain_dict["password"]

            # 验证是否合法
            if CheckParameters(reg_username, reg_password) == False:
                return json.dumps("{'message': '传入用户名密码不合法'}", ensure_ascii=False)

            # 查询用户是否存在
            select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
            if select != []:
                return json.dumps("{'message': '用户名已被注册'}", ensure_ascii=False)
            else:
                insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
                if insert == True:
                    return json.dumps("{'message': '注册成功'}", ensure_ascii=False)
                else:
                    return json.dumps("{'message': '注册失败'}", ensure_ascii=False)
        else:
            return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
    return json.dumps("{'message': '未知错误'}", ensure_ascii=False)

# 密码修改函数
@app.route("/modify", methods=["POST"])
@login_check
def modify():
    if request.method == "POST":
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 1:

            mdf_password = obtain_dict["password"]
            get_token = request.headers.get("token")
            print("获取token: {} 修改后密码: {}".format(get_token,mdf_password))

            # 验证是否合法
            if CheckParameters(get_token, mdf_password) == False:
                return json.dumps("{'message': '传入密码不合法'}", ensure_ascii=False)

            # 先得到token对应用户名
            select = RunSqlite("database.db","SessionAuthDB","select","username",f"token='{get_token}'")
            if select != []:
                # 接着直接修改密码即可
                modify_username = str(select[0][0])
                print("得到的用户名: {}".format(modify_username))
                update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
                if update == True:
                    # 删除原来的token,让用户重新获取
                    delete = RunSqlite("database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
                    print("删除token状态: {}".format(delete))
                    return json.dumps("{'message': '修改成功,请重新登录获取Token'}", ensure_ascii=False)

                else:
                    return json.dumps("{'message': '修改失败'}", ensure_ascii=False)
            else:
                return json.dumps("{'message': '不存在该Token,无法修改密码'}", ensure_ascii=False)
        else:
            return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
    return json.dumps("{'message': '未知错误'}", ensure_ascii=False)

FBV模式下的完整代码,以下是对代码的概述:

主要功能:

  1. 数据库操作: 封装了对 SQLite 数据库的基本增删改查操作(RunSqlite 函数)。
  2. 用户认证: 提供了用户登录、注册和密码修改的功能。使用了 Token 机制进行登录认证,并通过装饰器 login_check 来验证 Token 的有效性。
  3. 创建数据库表: 提供了一个用于初始化数据库表结构的接口 /create
  4. 获取页面信息: 通过 /GetPage 接口,使用了 login_check 装饰器来验证用户登录状态,仅对已登录用户提供页面信息。

主要路由

  • /create:创建数据库表结构。
  • /login:用户登录接口,返回用户的 Token。
  • /GetPage:获取页面信息,需要用户登录并携带有效 Token。
  • /register:用户注册接口。
  • /modify:修改用户密码接口,需要用户登录并携带有效 Token。

代码结构

  1. 数据库操作:
    • 提供了对 SQLite 数据库的基本操作,包括插入、更新、查询和删除。
  2. 用户认证:
    • 使用了装饰器 login_check 对需要登录的路由进行认证。
    • 提供了用户登录、注册和密码修改的路由。
  3. 创建数据库表:
    • 提供了一个用于初始化数据库表结构的路由。
  4. 获取页面信息:
    • 提供了一个用于获取页面信息的路由,需要用户登录并携带有效 Token。
from flask import Flask,render_template,request,Response,redirect,jsonify
from functools import wraps
import json,sqlite3,random,string,time

app = Flask(__name__)

# 增删改查简单封装
def RunSqlite(db,table,action,field,value):
    connect = sqlite3.connect(db)
    cursor = connect.cursor()

    # 执行插入动作
    if action == "insert":
        insert = f"insert into {table}({field}) values({value});"
        if insert == None or len(insert) == 0:
            return False
        try:
            cursor.execute(insert)
        except Exception:
            return False

    # 执行更新操作
    elif action == "update":
        update = f"update {table} set {value} where {field};"
        if update == None or len(update) == 0:
            return False
        try:
            cursor.execute(update)
        except Exception:
            return False

    # 执行查询操作
    elif action == "select":

        # 查询条件是否为空
        if value == "none":
            select = f"select {field} from {table};"
        else:
            select = f"select {field} from {table} where {value};"

        try:
            ref = cursor.execute(select)
            ref_data = ref.fetchall()
            connect.commit()
            connect.close()
            return ref_data
        except Exception:
            return False

    # 执行删除操作
    elif action == "delete":
        delete = f"delete from {table} where {field};"
        if delete == None or len(delete) == 0:
            return False
        try:
            cursor.execute(delete)
        except Exception:
            return False
    try:
        connect.commit()
        connect.close()
        return True
    except Exception:
        return False

@app.route("/create",methods=["GET"])
def create():
    conn = sqlite3.connect("./database.db")
    cursor = conn.cursor()
    create_auth = "create table UserAuthDB(" \
             "id INTEGER primary key AUTOINCREMENT not null unique," \
             "username varchar(64) not null unique," \
             "password varchar(64) not null" \
             ")"
    cursor.execute(create_auth)

    create_session = "create table SessionAuthDB(" \
                     "id INTEGER primary key AUTOINCREMENT not null unique," \
                     "username varchar(64) not null unique," \
                     "token varchar(128) not null unique," \
                     "invalid_date int not null" \
                     ")"

    cursor.execute(create_session)
    conn.commit()
    cursor.close()
    conn.close()
    return "create success"

# 验证用户名密码是否合法
def CheckParameters(*kwargs):
    for item in range(len(kwargs)):
        # 先验证长度
        if len(kwargs[item]) >= 256 or len(kwargs[item]) == 0:
            return False

        # 先小写,然后去掉两侧空格,去掉所有空格
        local_string = kwargs[item].lower().strip().replace(" ","")

        # 判断是否只包含 大写 小写 数字
        for kw in local_string:
            if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
                return False
    return True

# 登录认证模块
@app.route("/login",methods=["POST"])
def login():
    if request.method == "POST":
        # 获取参数信息
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 2:
            username = obtain_dict["username"]
            password = obtain_dict["password"]

            # 验证是否合法
            is_true = CheckParameters(username,password)
            if is_true == True:

                # 查询是否存在该用户
                select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
                if select[0][0] == username and select[0][1] == password:
                    # 查询Session列表是否存在
                    select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
                    if select_session != []:
                        ref = {"message": ""}
                        ref["message"] = select_session[0][0]
                        return json.dumps(ref, ensure_ascii=False)

                    # Session不存在则需要重新生成
                    else:
                        # 生成并写入token和过期时间戳
                        token = ''.join(random.sample(string.ascii_letters + string.digits, 32))

                        # 设置360秒周期,过期时间
                        time_stamp = int(time.time()) + 360

                        insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
                        if insert == True:
                            ref = {"message": ""}
                            ref["message"] = token
                            return json.dumps(ref, ensure_ascii=False)
                else:
                    return json.dumps("{'message': '用户名或密码错误'}", ensure_ascii=False)
            else:
                return json.dumps("{'message': '输入参数不可用'}", ensure_ascii=False)

    return json.dumps("{'message': '未知错误'}", ensure_ascii=False)

# 检查登录状态 token是否过期的装饰器
def login_check(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        print("处理登录逻辑部分: {}".format(request.url))

        # 得到token 验证是否登陆了,且token没有过期
        local_timestamp = int(time.time())
        get_token = request.headers.get("token")

        # 验证传入参数是否合法
        if CheckParameters(get_token) == True:
            select = RunSqlite("./database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
            print(select)
            # 判断是否存在记录,如果存在,在判断时间戳是否合理
            if select != []:
                # 如果当前时间与数据库比对,大于说明过期了需要删除原来的,让用户重新登录
                if local_timestamp >= int(select[0][1]):
                    print("时间戳过期了")
                    # 删除原来的Token
                    delete = RunSqlite("./database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
                    if delete == True:
                        return json.dumps("{'token': 'Token 已过期,请重新登录获取'}", ensure_ascii=False)
                    else:
                        return json.dumps("{'token': '数据库删除异常,请联系开发者'}", ensure_ascii=False)
                else:
                    # 验证Token是否一致
                    if select[0][0] == get_token:
                        print("Token验证正常,继续执行function_ptr指向代码.")
                        # 返回到原函数
                        return func(*args, **kwargs)
                    else:
                        print("Token验证错误 {}".format(select))
                        return json.dumps("{'token': 'Token 传入错误'}", ensure_ascii=False)

            # 装饰器调用原函数
            # function_ptr = func(*args, **kwargs)

        return json.dumps("{'token': 'Token 验证失败'}", ensure_ascii=False)
    return wrapper

# 获取参数函数
@app.route("/GetPage", methods=["POST"])
@login_check
def GetPage():
    if request.method == "POST":
        # 获取参数信息
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 1:

            pagename = obtain_dict["pagename"]
            print("查询名称: {}".format(obtain_dict["pagename"]))

            # 相应头的完整写法
            req = Response(response="ok", status=200, mimetype="application/json")
            req.headers["Content-Type"] = "text/json; charset=utf-8"
            req.headers["Server"] = "LyShark Server 1.0"
            req.data = json.dumps("{'message': 'hello world'}")
            return req
        else:
            return json.dumps("{'message': '传入参数错误,请携带正确参数请求'}", ensure_ascii=False)
    return json.dumps("{'token': '未知错误'}", ensure_ascii=False)

# 用户注册函数
@app.route("/register", methods=["POST"])
def Register():
    if request.method == "POST":
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 2:

            print("用户名: {} 密码: {}".format(obtain_dict["username"], obtain_dict["password"]))
            reg_username = obtain_dict["username"]
            reg_password = obtain_dict["password"]

            # 验证是否合法
            if CheckParameters(reg_username, reg_password) == False:
                return json.dumps("{'message': '传入用户名密码不合法'}", ensure_ascii=False)

            # 查询用户是否存在
            select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
            if select != []:
                return json.dumps("{'message': '用户名已被注册'}", ensure_ascii=False)
            else:
                insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
                if insert == True:
                    return json.dumps("{'message': '注册成功'}", ensure_ascii=False)
                else:
                    return json.dumps("{'message': '注册失败'}", ensure_ascii=False)
        else:
            return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
    return json.dumps("{'message': '未知错误'}", ensure_ascii=False)

# 密码修改函数
@app.route("/modify", methods=["POST"])
@login_check
def modify():
    if request.method == "POST":
        obtain_dict = request.form.to_dict()
        if len(obtain_dict) != 0 and len(obtain_dict) == 1:

            mdf_password = obtain_dict["password"]
            get_token = request.headers.get("token")
            print("获取token: {} 修改后密码: {}".format(get_token,mdf_password))

            # 验证是否合法
            if CheckParameters(get_token, mdf_password) == False:
                return json.dumps("{'message': '传入密码不合法'}", ensure_ascii=False)

            # 先得到token对应用户名
            select = RunSqlite("./database.db","SessionAuthDB","select","username",f"token='{get_token}'")
            if select != []:
                # 接着直接修改密码即可
                modify_username = str(select[0][0])
                print("得到的用户名: {}".format(modify_username))
                update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
                if update == True:
                    # 删除原来的token,让用户重新获取
                    delete = RunSqlite("./database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
                    print("删除token状态: {}".format(delete))
                    return json.dumps("{'message': '修改成功,请重新登录获取Token'}", ensure_ascii=False)

                else:
                    return json.dumps("{'message': '修改失败'}", ensure_ascii=False)
            else:
                return json.dumps("{'message': '不存在该Token,无法修改密码'}", ensure_ascii=False)
        else:
            return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
    return json.dumps("{'message': '未知错误'}", ensure_ascii=False)

if __name__ == '__main__':
    app.run(debug=True)

首先需要在Web页面访问http://127.0.0.1/create路径实现对数据库的初始化,并打开Postman工具,通过传入参数来使用这个案例。

Flask 实现Token认证机制文章来源地址https://www.toymoban.com/news/detail-747706.html

到了这里,关于Flask 实现Token认证机制的文章就介绍完了。如果您还想了解更多内容,请在右上角搜索TOY模板网以前的文章或继续浏览下面的相关文章,希望大家以后多多支持TOY模板网!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处: 如若内容造成侵权/违法违规/事实不符,请点击违法举报进行投诉反馈,一经查实,立即删除!

领支付宝红包赞助服务器费用

相关文章

  • 【Python】Python Flask token身份认证(附完整代码)

    【Python】Python Flask token身份认证(附完整代码)

    前言 Python Flask是一个使用Python编写的轻量级Web应用框架,它可以非常方便地搭建Web应用。在Web应用中,经常需要进行身份认证,以确保只有授权用户才能访问某些资源。本文将介绍如何使用token进行身份认证,以及如何在Python Flask中实现token身份认证。 一、什么是token身份认

    2024年02月10日
    浏览(11)
  • 一文搞定fastapi+token认证机制(附全部源码)

    一文搞定fastapi+token认证机制(附全部源码)

    fastapi最近越发流行,比django轻便易用,尤其是对于后端编写,其轻便的优势越发明显,可是如果是需要和前端进行统一认证需自行设计认证能力,此文就将fastapi+token的实现代码全部展现,希望对你有帮助 第一步: 首先现行搭建基本的fastapi框架 python第三方库需要安装 fasta

    2024年01月21日
    浏览(8)
  • Python 框架学习 Django篇 (五) Session与Token认证

    Python 框架学习 Django篇 (五) Session与Token认证

    我们前面经过数据库的学习已经基本了解了怎么接受前端发过来的请求,并处理后返回数据实现了一个基本的登录登出效果,但是存在一个问题,我们是将所有的请求都直接处理了,并没有去检查是否为已经登录的管理员发送的,如果是这样的话客户端可以不选择登录直接去

    2024年02月07日
    浏览(10)
  • Flask_实现token鉴权

    Flask_实现token鉴权

    目录 1、安装依赖 2、实现代码 3、测试 源码等资料获取方法 流程图 请求接口不传token 请求接口传有效token 请求接口传失效token 各位想获取源码等教程资料的朋友请 点赞 + 评论 + 收藏 ,三连! 三连 之后我会在评论区挨个私信发给你们~

    2024年02月17日
    浏览(11)
  • python flask 令牌token原理及代码实现

    觉得废话多,可以直接看代码 代码参考:http://t.csdn.cn/Sf8km 令牌token解决了什么问题 解决http请求无状态的特性,让每次请求都有状态,知道请求是哪个用户发来的 首先要知道,http请求是无状态的 也就是说,即使是同一个人发送的两次请求,服务器也是不知道是同一个人过来

    2024年02月12日
    浏览(9)
  • Websocket实现token认证方式

    Websocket实现token认证方式 1.将token放到header中 java客户端代码展示 2.将token放在url上 java客户端代码展示 3.将token放到 WebSockets的Sec-WebSocket-Protocol中 java客户端代码展示 第一种方式很好理解,正常http协议也是这么做的;但是前端的websocket client js 不支持传递header,所以就出现了第

    2024年02月12日
    浏览(13)
  • Java实现基于token认证

    Java实现基于token认证

    Jwt全称是:json web token,以JSON对象的形式安全的传递信息。它将用户信息加密到token里,服务器不保存任何用户信息。服务器通过使用保存的密钥验证token的正确性,只要正确即通过验证。 用户登录返回token进行验证的流程: 用户使用账号发出post请求; 服务器使用私钥创建一

    2024年02月05日
    浏览(9)
  • Flask框架【before_first_request和before_request详解、钩子函数、Flask_信号机制】(七)

    Flask框架【before_first_request和before_request详解、钩子函数、Flask_信号机制】(七)

    👏作者简介:大家好,我是爱敲代码的小王,CSDN博客博主,Python小白 📕系列专栏:python入门到实战、Python爬虫开发、Python办公自动化、Python数据分析、Python前后端开发 📧如果文章知识点有错误的地方,请指正!和大家一起学习,一起进步👀 🔥如果感觉博主的文章还不错的

    2024年02月07日
    浏览(7)
  • Springboot+redis实现token的登录认证

    Springboot+redis实现token的登录认证

    1.在用户登录后,如果要访问其他路径下的资源的话,我们是否还需要再验证一遍呢?而且我们登陆上系统长时间不操作的话还需不需要再次验证?所以这种情况下就很需要token来实现登录功能。并通过redis(redis是一个key-value存储系统,它支持存储的value类型相对更多,包括

    2023年04月09日
    浏览(6)
  • SpringSecurity实现前后端分离登录token认证详解

    SpringSecurity实现前后端分离登录token认证详解

    目录 1. SpringSecurity概述 1.1 权限框架 1.1.1 Apache Shiro 1.1.2 SpringSecurity 1.1.3 权限框架的选择 1.2 授权和认证 1.3 SpringSecurity的功能 2.SpringSecurity 实战 2.1 引入SpringSecurity 2.2 认证 2.2.1 登录校验流程  2.2.2 SpringSecurity完整流程  2.2.3 认证流程详解 2.3 思路分析 2.4 代码实战 2.4.1  自定义

    2024年02月08日
    浏览(40)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

博客赞助

微信扫一扫打赏

请作者喝杯咖啡吧~博客赞助

支付宝扫一扫领取红包,优惠每天领

二维码1

领取红包

二维码2

领红包