feat: Add HIP13 records to user info
All checks were successful
Build Docker / Build Docker (push) Successful in 24s

This commit is contained in:
Nathan Woodburn 2024-06-15 15:47:22 +10:00
parent b67c1576e1
commit 4965bda4da
Signed by: nathanwoodburn
GPG Key ID: 203B000478AD0EF1

View File

@ -22,6 +22,16 @@ from eth_account import Account
bp = Blueprint("home", __name__) bp = Blueprint("home", __name__)
openSeaAPIKey = os.getenv("OPENSEA_API_KEY") openSeaAPIKey = os.getenv("OPENSEA_API_KEY")
restricted_keys = [
"id",
"uid",
"username",
"sub",
"email_verified",
"roles",
"role",
"admin"
]
if not os.path.exists("website/avatars"): if not os.path.exists("website/avatars"):
os.makedirs("website/avatars") os.makedirs("website/avatars")
@ -36,6 +46,77 @@ def current_user():
def split_by_crlf(s): def split_by_crlf(s):
return [v for v in s.splitlines() if v] return [v for v in s.splitlines() if v]
def get_idns_records(domain:str) -> list:
query = dns.message.make_query(domain, dns.rdatatype.TXT)
dns_request = query.to_wire()
# Send the DNS query over HTTPS
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
# Parse the DNS response
dns_response = dns.message.from_wire(response.content)
# Loop over TXT records and look for profile
idns_records = []
for record in dns_response.answer:
if record.rdtype == dns.rdatatype.TXT:
for txt in record:
txt_value:str = txt.to_text().strip('"')
if txt_value.startswith("IDNS1"):
print(txt_value)
idns = txt_value.removeprefix("IDNS1 ")
idns = idns.split(" ")
for r in idns:
idns_records.append(r)
return idns_records
def get_user_info(user:User) -> dict:
picture = f"https://login.hns.au/u/{user.username}/avatar.png"
if user.profile_picture:
picture = user.profile_picture
userInfo = {
"id": user.id,
"uid": user.id,
"username": user.username,
"email": f"{user.username}@login.hns.au",
"displayName": user.username + "/",
"sub": user.id,
"name": user.username,
"given_name": user.username,
"family_name": user.username,
"nickname": user.username,
"preferred_username": user.username,
"profile": f"https://login.hns.au/u/{user.username}",
"picture": picture,
"website": f"https://{user.username}",
"email_verified": True
}
idns_info = get_idns_records(user.username)
for record in idns_info:
type = record.split(":")[0]
content = record.split(":")
if len(content) > 2:
content = ":".join(content[1:])
else:
content = content[1]
key = content.split("=")[0].lower()
value = content.split("=")[1]
if type == "profile":
if key == "avatar":
userInfo["picture"] = value
elif key == "email":
userInfo["email"] = value
userInfo["email_verified"] = False
elif key == "name":
userInfo["name"] = value
userInfo["nickname"] = value
userInfo["displayName"] = value
else:
if key in restricted_keys:
continue
userInfo[key] = value
return userInfo
@bp.route("/", methods=("GET", "POST")) @bp.route("/", methods=("GET", "POST"))
def home(): def home():
@ -227,27 +308,8 @@ def txtLoginDomain(domain):
# Get uuid # Get uuid
uuid = session["uuid"] uuid = session["uuid"]
query = dns.message.make_query(domain, dns.rdatatype.TXT) idns_records = get_idns_records(domain)
dns_request = query.to_wire()
# Send the DNS query over HTTPS
response = requests.post('https://hnsdoh.com/dns-query', data=dns_request, headers={'Content-Type': 'application/dns-message'})
# Parse the DNS response
dns_response = dns.message.from_wire(response.content)
# Loop over TXT records and look for profile avatar
idns_records = []
for record in dns_response.answer:
if record.rdtype == dns.rdatatype.TXT:
for txt in record:
txt_value:str = txt.to_text().strip('"')
if txt_value.startswith("IDNS1"):
print(txt_value)
idns = txt_value.removeprefix("IDNS1 ")
idns = idns.split(" ")
for r in idns:
idns_records.append(r)
for record in idns_records: for record in idns_records:
print(record) print(record)
@ -377,35 +439,6 @@ def revoke_token():
return authorization.create_endpoint_response("revocation") return authorization.create_endpoint_response("revocation")
@bp.route("/api/me")
@require_oauth(["profile", "openid"])
def api_me():
user = current_token.user
picture = f"https://login.hns.au/u/{user.username}/avatar.png"
if user.profile_picture:
picture = user.profile_picture
userInfo = {
"id": user.id,
"uid": user.id,
"username": user.username,
"email": f"{user.username}@login.hns.au",
"displayName": user.username + "/",
"sub": user.id,
"name": user.username,
"given_name": user.username,
"family_name": user.username,
"nickname": user.username,
"preferred_username": user.username,
"profile": f"https://login.hns.au/u/{user.username}",
"picture": picture,
"website": f"https://{user.username}",
"email_verified": True
}
return jsonify(userInfo)
@bp.route("/discovery") @bp.route("/discovery")
def autodiscovery(): def autodiscovery():
host = request.host host = request.host
@ -428,34 +461,20 @@ def autodiscovery():
return jsonify(discovery) return jsonify(discovery)
@bp.route("/api/me")
@require_oauth(["profile", "openid"])
def api_me():
user = current_token.user
return jsonify(get_user_info(user))
@bp.route("/u/<username>") @bp.route("/u/<username>")
def profile(username): def profile(username):
user = User.query.filter_by(username=username).first() user = User.query.filter_by(username=username).first()
if not user: if not user:
return jsonify({"error": "User not found"}) return jsonify({"error": "User not found"})
picture = f"https://login.hns.au/u/{user.username}/avatar.png"
if user.profile_picture: return jsonify(get_user_info(user))
picture = user.profile_picture
userInfo = {
"id": user.id,
"uid": user.id,
"username": user.username,
"email": f"{user.username}@login.hns.au",
"displayName": user.username + "/",
"sub": user.id,
"name": user.username,
"given_name": user.username,
"family_name": user.username,
"nickname": user.username,
"preferred_username": user.username,
"profile": f"https://login.hns.au/u/{user.username}",
"picture": picture,
"website": f"https://{user.username}",
"email_verified": True
}
return jsonify(userInfo)
@bp.route("/u/<username>/avatar.png") @bp.route("/u/<username>/avatar.png")
def avatar(username): def avatar(username):