feat: Try to add more info to user info route
All checks were successful
Build Docker / Build Docker (push) Successful in 30s

This commit is contained in:
Nathan Woodburn 2024-06-13 16:07:38 +10:00
parent bfbd986469
commit 82eb3b925b
Signed by: nathanwoodburn
GPG Key ID: 203B000478AD0EF1

View File

@ -9,12 +9,12 @@ from .models import db, User, OAuth2Client
from .oauth2 import authorization, require_oauth from .oauth2 import authorization, require_oauth
bp = Blueprint('home', __name__) bp = Blueprint("home", __name__)
def current_user(): def current_user():
if 'id' in session: if "id" in session:
uid = session['id'] uid = session["id"]
return User.query.get(uid) return User.query.get(uid)
return None return None
@ -23,24 +23,24 @@ def split_by_crlf(s):
return [v for v in s.splitlines() if v] return [v for v in s.splitlines() if v]
@bp.route('/', methods=('GET', 'POST')) @bp.route("/", methods=("GET", "POST"))
def home(): def home():
next_page = request.args.get('next') next_page = request.args.get("next")
if request.method == 'POST': if request.method == "POST":
auth = varo_auth_flask_login(request) auth = varo_auth_flask_login(request)
if auth == False: if auth == False:
return redirect('/?error=login_failed') return redirect("/?error=login_failed")
print(auth) print(auth)
user = User.query.filter_by(username=auth).first() user = User.query.filter_by(username=auth).first()
if not user: if not user:
user = User(username=auth) user = User(username=auth)
db.session.add(user) db.session.add(user)
db.session.commit() db.session.commit()
session['id'] = user.id session["id"] = user.id
# if user is not just to log in, but need to head back to the auth page, then go for it # if user is not just to log in, but need to head back to the auth page, then go for it
if next_page: if next_page:
return redirect(next_page) return redirect(next_page)
return redirect('/') return redirect("/")
user = current_user() user = current_user()
if user: if user:
clients = OAuth2Client.query.filter_by(user_id=user.id).all() clients = OAuth2Client.query.filter_by(user_id=user.id).all()
@ -48,27 +48,27 @@ def home():
return redirect(next_page) return redirect(next_page)
else: else:
clients = [] clients = []
return render_template('home.html', user=user, clients=clients) return render_template("home.html", user=user, clients=clients)
@bp.route('/logout') @bp.route("/logout")
def logout(): def logout():
del session['id'] del session["id"]
next = request.args.get('next') next = request.args.get("next")
if next: if next:
return redirect(url_for('home.home', next=next)) return redirect(url_for("home.home", next=next))
return redirect('/') return redirect("/")
@bp.route('/create_client', methods=('GET', 'POST')) @bp.route("/create_client", methods=("GET", "POST"))
def create_client(): def create_client():
user = current_user() user = current_user()
if not user: if not user:
return redirect('/') return redirect("/")
if request.method == 'GET': if request.method == "GET":
return render_template('create_client.html') return render_template("create_client.html")
client_id = gen_salt(24) client_id = gen_salt(24)
client_id_issued_at = int(time.time()) client_id_issued_at = int(time.time())
@ -86,109 +86,133 @@ def create_client():
"redirect_uris": split_by_crlf(form["redirect_uri"]), "redirect_uris": split_by_crlf(form["redirect_uri"]),
"response_types": split_by_crlf(form["response_type"]), "response_types": split_by_crlf(form["response_type"]),
"scope": form["scope"], "scope": form["scope"],
"token_endpoint_auth_method": form["token_endpoint_auth_method"] "token_endpoint_auth_method": form["token_endpoint_auth_method"],
} }
client.set_client_metadata(client_metadata) client.set_client_metadata(client_metadata)
if form['token_endpoint_auth_method'] == 'none': if form["token_endpoint_auth_method"] == "none":
client.client_secret = '' client.client_secret = ""
else: else:
client.client_secret = gen_salt(48) client.client_secret = gen_salt(48)
db.session.add(client) db.session.add(client)
db.session.commit() db.session.commit()
return redirect('/') return redirect("/")
@bp.route('/delete_client')
@bp.route("/delete_client")
def delete_client(): def delete_client():
user = current_user() user = current_user()
if not user: if not user:
return redirect('/') return redirect("/")
if user.id != 1: if user.id != 1:
return redirect('/') return redirect("/")
client_id = request.args.get('client_id') client_id = request.args.get("client_id")
client = OAuth2Client.query.filter_by(client_id=client_id).first() client = OAuth2Client.query.filter_by(client_id=client_id).first()
if client: if client:
db.session.delete(client) db.session.delete(client)
db.session.commit() db.session.commit()
return redirect('/') return redirect("/")
@bp.route('/oauth/authorize', methods=['GET', 'POST']) @bp.route("/oauth/authorize", methods=["GET", "POST"])
def authorize(): def authorize():
user = current_user() user = current_user()
# if user log status is not true (Auth server), then to log it in # if user log status is not true (Auth server), then to log it in
if not user: if not user:
return redirect(url_for('home.home', next=request.url)) return redirect(url_for("home.home", next=request.url))
if request.method == 'GET': if request.method == "GET":
try: try:
grant = authorization.get_consent_grant(end_user=user) grant = authorization.get_consent_grant(end_user=user)
except OAuth2Error as error: except OAuth2Error as error:
return error.error return error.error
return render_template('authorize.html', user=user, grant=grant) return render_template("authorize.html", user=user, grant=grant)
grant_user = user grant_user = user
return authorization.create_authorization_response(grant_user=grant_user) return authorization.create_authorization_response(grant_user=grant_user)
@bp.route('/oauth/token', methods=['POST']) @bp.route("/oauth/token", methods=["POST"])
def issue_token(): def issue_token():
return authorization.create_token_response() return authorization.create_token_response()
@bp.route('/oauth/revoke', methods=['POST']) @bp.route("/oauth/revoke", methods=["POST"])
def revoke_token(): def revoke_token():
return authorization.create_endpoint_response('revocation') return authorization.create_endpoint_response("revocation")
@bp.route('/api/me') @bp.route("/api/me")
@require_oauth(['profile', 'openid']) @require_oauth(["profile", "openid"])
def api_me(): def api_me():
user = current_token.user user = current_token.user
print(user.id, user.username) print(user.id, user.username, flush=True)
return jsonify(id=user.id, username=user.username, userInfo = {
email= f'{user.username}@login.hns.au', "id": user.id,
displayName=user.username+"/") "username": user.username,
"email": f"{user.username}@login.hns.au",
"displayName": user.username + "/",
"sub": user.id,
"name": user.username,
"given_name": user.username,
"family_name": user.username,
"nickname": user.username,
"preferred_username": user.username,
"profile": f"https://login.hns.au/u/{user.username}",
"picture": f"https://login.hns.au/u/{user.username}/avatar",
"website": f"https://{user.username}"
}
print(userInfo, flush=True)
# a = [
# "sub",
# "name",
# "given_name",
# "family_name",
# "middle_name",
# "nickname",
# "preferred_username",
# "profile",
# "picture",
# "website",
# "email",
# "email_verified",
# "gender",
# "birthdate",
# "zoneinfo",
# "locale",
# "phone_number",
# "phone_number_verified",
# "address",
# "updated_at",
# ]
return jsonify(userInfo)
@bp.route('/discovery')
@bp.route("/discovery")
def autodiscovery(): def autodiscovery():
host = request.host host = request.host
discovery = { discovery = {
"issuer": f"https://{host}/", "issuer": f"https://{host}/",
"authorization_endpoint": f"https://{host}/oauth/authorize", "authorization_endpoint": f"https://{host}/oauth/authorize",
"token_endpoint": f"https://{host}/oauth/token", "token_endpoint": f"https://{host}/oauth/token",
"userinfo_endpoint": f"https://{host}/api/me", "userinfo_endpoint": f"https://{host}/api/me",
"revocation_endpoint": f"https://{host}/oauth/revoke", "revocation_endpoint": f"https://{host}/oauth/revoke",
"response_types_supported": [ "response_types_supported": ["code"],
"code" "subject_types_supported": ["public"],
], "id_token_signing_alg_values_supported": ["RS256"],
"subject_types_supported": [ "scopes_supported": ["openid", "email", "profile"],
"public" "token_endpoint_auth_methods_supported": [
], "client_secret_basic",
"id_token_signing_alg_values_supported": [ "client_secret_post",
"RS256" ],
], "grant_types_supported": ["authorization_code"],
"scopes_supported": [ }
"openid",
"email",
"profile"
],
"token_endpoint_auth_methods_supported": [
"client_secret_basic",
"client_secret_post",
],
"grant_types_supported": [
"authorization_code"
]
}
return jsonify(discovery) return jsonify(discovery)
@bp.route("/favicon.png")
@bp.route('/favicon.png')
def favicon(): def favicon():
return send_from_directory('templates', 'favicon.png', mimetype='image/png') return send_from_directory("templates", "favicon.png", mimetype="image/png")