JWT Auth

160 views
Skip to first unread message

Samuel Marques

unread,
Apr 27, 2020, 11:24:53 AM4/27/20
to cherrypy-users
Hello everyone, I'm new here, so now, here's my presentation: My name is Sam Marxz, I'm 22 years old and I'm a Brazilian Front-End developer with React.

I'm learning about REST API's from the back-end side and as I have a reasonable knowledge of python I decided that it would be a good idea to create an API with a mini framework to really learn how things are done. I chose CherryPy for its simplicity (not like Flask, which is a great framework, but already has a lot ready). I did the CRUD part and everything was OK, but I want to add Authentication and Protection of routes with JWT;



(I couldn't put the text down there.)
- How can I pass information to the route via Tool ?
- The code is OK ? Am I doing it right?
- How to pass the db to the tool to check

Thanks in advance



Here's the code:

app.py 

import os
import cherrypy

from cp_sqlalchemy import SQLAlchemyTool, SQLAlchemyPlugin

from models import Base
from resources import LoginResource, UserResource, WebsiteResource



HERE = os.path.dirname(os.path.abspath(__file__))


if __name__ == '__main__':
   cherrypy.tools.db = SQLAlchemyTool()

    conf = {
       '/': {
           'request.dispatch': cherrypy.dispatch.MethodDispatcher(),
           'tools.response_headers.on': True,
           'tools.db.on': True,
           'tools.response_headers.headers': [
               ('Content-Type', 'application/json')
           ],
       }
   }
   
   cherrypy.tree.mount(LoginResource(), '/api/login', config=conf)
   cherrypy.tree.mount(UserResource(), '/api/user', config=conf)
   cherrypy.tree.mount(WebsiteResource(), '/api/websites', config=conf)

    dbfile = os.path.join(HERE, 'test.db')

    if not os.path.exists(dbfile):
       open(dbfile, 'w+').close()

    sqlalchemy_plugin = SQLAlchemyPlugin(
       cherrypy.engine, Base, 'sqlite:///%s' % (dbfile),
       echo=True
   )

    sqlalchemy_plugin.subscribe()
   sqlalchemy_plugin.create()
   cherrypy.engine.start()
   cherrypy.engine.block()

tools.py
import jwt
import cherrypy

from models import User
from config import SECRET_KEY


@cherrypy.tools.register('before_handler')
def check_jwt():
token = cherrypy.request.headers.get('authorization').split(' ')[1]

if not token:
raise cherrypy.HTTPError(400, 'You must provide JWT token')

try:
info = jwt.decode(token, SECRET_KEY, algorithm='HS256')
except jwt.ExpiredSignatureError:
raise cherrypy.HTTPError(400, 'Token expirado')
except jwt.DecodeError:
raise cherrypy.HTTPError(400, 'Token inválido')
except:
raise cherrypy.HTTPError(500, 'Error in System')
else:
# CHECK IN DATABASE IF PAYLOAD IS OK AND RETURN TO ROUTE

resources.py
import jwt
import datetime
import cherrypy

from models import Website, User
from config import SECRET_KEY
from tools import check_jwt

class BaseResource(object):
@property
def db(self):
return cherrypy.request.db


@cherrypy.expose
class UserResource(BaseResource):
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
def POST(self):
"""Register new user"""
username = cherrypy.request.json.get('username')
password = cherrypy.request.json.get('password')

if not username or not password:
raise cherrypy.HTTPError(400, 'Username and Password are required')

if self.db.query(User).filter_by(username=username).first():
raise cherrypy.HTTPError(400, 'User already exists')

try:
self.db.add(User(username=username, password=User.generate_hash(password)))
self.db.commit()
pay_load = {
'username': username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)
}
access_token = jwt.encode(pay_load, SECRET_KEY, algorithm='HS256')
return {
'message': 'User {} was created'.format(username),
'access_token': 'Bearer {}'.format(access_token.decode('utf-8'))
}
except:
raise cherrypy.HTTPError(500, {'message': 'Something went wrong'})


@cherrypy.expose
class LoginResource(BaseResource):
@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
def POST(self):
"""Login user"""
username = cherrypy.request.json.get('username')
password = cherrypy.request.json.get('password')

if not username or not password:
raise cherrypy.HTTPError(400, 'Username and Password are required')

current_user = self.db.query(User).filter_by(username = username).first()

if not current_user:
raise cherrypy.HTTPError(404, 'User not found')

if User.verify_hash(password, current_user.password):
pay_load = {
'username': current_user.username,
'exp': datetime.datetime.utcnow() + datetime.timedelta(days=1)
}
access_token = jwt.encode(pay_load, SECRET_KEY, algorithm='HS256')
return {
'message': 'Logged in as {}'.format(current_user.username),
'access_token': 'Bearer {}'.format(access_token.decode('utf-8'))
}
else:
return {'message': 'Invalid username or password'}


@cherrypy.expose
class WebsiteResource(BaseResource):
@cherrypy.tools.json_out()
def GET(self):
websites = self.db.query(Website).all()
return {"websites": [website.to_dict() for website in websites]}

@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
@cherrypy.tools.check_jwt()
def POST(self):
"""
I Want check JWT Token before enter's here.
Protect this Route
"""
title = cherrypy.request.json.get('title')
url = cherrypy.request.json.get('url')

self.db.add(Website(title = title, url = url))
self.db.commit()

return {'message': 'Website {} created!'.format(title)}

@cherrypy.tools.json_in()
@cherrypy.tools.json_out()
def PUT(self, id):
title = cherrypy.request.json.get('title')
url = cherrypy.request.json.get('url')

# self.db.update(Website).values(title=title, url=url).where(id=id)
self.db.query(Website).filter_by(id=id).update({"title":title, "url": url})

return {'message': 'Website {} updated!'.format(title)}

@cherrypy.tools.json_out()
def DELETE(self, id):
self.db.query(Website).filter_by(id=id).delete()
return {'message': 'Deleted!'}


models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base
from passlib.hash import pbkdf2_sha256 as sha256

from sqlalchemy_serializer import SerializerMixin


Base = declarative_base()


class User(Base, SerializerMixin):
__tablename__ = 'users'

id = Column(Integer, primary_key=True)
username = Column(String(60), unique=True, nullable=False)
password = Column(String(120), nullable=False)

@staticmethod
def generate_hash(password):
return sha256.hash(password)

@staticmethod
def verify_hash(password, hash):
return sha256.verify(password, hash)


class Website(Base, SerializerMixin):
__tablename__ = 'websites'

id = Column(Integer, primary_key=True)
title = Column(String(120))
url = Column(String(100))

def __repr__(self):
return f"Website {self.title}"

Valera Malinovsky

unread,
Aug 16, 2020, 5:33:32 AM8/16/20
to cherrypy-users
Hello

I'm also interested in how to pass user information between requests

Valera Malinovsky

unread,
Aug 16, 2020, 5:49:09 AM8/16/20
to cherrypy-users
import cherrypy


@cherrypy.tools.register('before_handler')
def required_user():
cherrypy.request.user = 'Sam Marxz'


class Root:
@cherrypy.expose
@cherrypy.tools.required_user()
def index(self):
return "hello {0}".format(cherrypy.request.user)


Reply all
Reply to author
Forward
0 new messages