balance-application/service/auth_service.py
2024-09-10 17:59:43 +09:00

89 lines
1.9 KiB
Python

import base64, psycopg2
from fastapi import Request
from pydantic import BaseModel
from util.config import conn_param
class AuthData:
name: str
username: str
password: str
salt: str
def __init__(self, name: str, username: str, password: str, salt: str):
self.name = name
self.username = username
self.password = password
self.salt = salt
class Credential(BaseModel):
username: str
password: str
class AuthService:
def __init__(self):
self._conn = psycopg2.connect(conn_param)
def create(self, data: AuthData):
cur = self._conn.cursor()
try:
if data.username == "" or data.password == "":
raise ValueError("username or password must not be null")
cur.execute(
"insert into account (name, username, password, salt) values (%s, %s, %s, %s)",
(data.name, data.username, data.password, data.salt)
)
self._conn.commit()
except:
self._conn.rollback()
raise RuntimeError("create account failed")
finally:
cur.close()
self._conn.close()
def read(self, username: str):
cur = self._conn.cursor()
cur.execute("select * from account where username = %s;", (username, ))
data = cur.fetchone()
if data == None:
return None
cur.close()
self._conn.close()
return AuthData(
name = data[0],
username = data[1],
password = data[2],
salt = data[3]
)
def get_data(self, req: Request):
raw = req.headers.get("Authorization")
if raw == None:
return None
raw_token = raw.removeprefix("Basic ").encode("ascii")
token = base64.b64decode(raw_token)
data = token.decode("utf-8").split(":")
return {
"username": data[0],
"password": data[1]
}
def check_auth(self, req: Request) -> bool:
data = self.get_data(req)
if data == None:
return False
acc = self.read(data["username"])
if acc == None:
return False
if acc.username == data["username"] and acc.password == data["password"]:
return True
return False