123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- from pathlib import Path
- import string
- import random
- import logging
- import json
- import uuid
- from subprocess import Popen, CalledProcessError, check_output
- from typing import Optional
- import hashlib
- import base64
- import redis
- from fastapi import FastAPI, APIRouter, HTTPException, Cookie, Request
- from fastapi.responses import Response, RedirectResponse
- from fastapi.staticfiles import StaticFiles
- from pydantic import BaseModel, BaseSettings
- code_dir = Path(".")
- class Settings(BaseSettings):
- app_name: str = "tResetter"
- validate_login_exe: str
- change_password_exe: str
- generate_password: Optional[str]
- redis_host: str = "localhost"
- expire_time: int = 60 * 20
- root_path: Optional[str]
- root_prefix: str = "/tresetter"
- class Config:
- pass
- @property
- def redis_params(self):
- return {"host": self.redis_host}
- settings = Settings()
- kwargs = {}
- if settings.root_path:
- kwargs["root_path"] = settings.root_path
- app = FastAPI(**kwargs)
- router = APIRouter(prefix=settings.root_prefix)
- app.mount(
- settings.root_prefix + "/s",
- StaticFiles(directory=str(code_dir / "static")),
- name="static",
- )
- logger = logging.getLogger("server")
- # Session {{{
- class SessionNotFoundException(HTTPException):
- def __init__(self):
- super().__init__(status_code=401)
- def create_session(content) -> str:
- session_id = str(uuid.uuid4())
- set_session(session_id, content)
- return session_id
- def set_session(session_id: str, content) -> bool:
- r = redis.Redis(**settings.redis_params)
- serialized = json.dumps(content)
- ret = r.set(session_id, serialized)
- r.expire(session_id, settings.expire_time)
- def get_session(session_id: str, renew=True):
- if session_id is None:
- raise SessionNotFoundException()
- r = redis.Redis(**settings.redis_params)
- serialized = r.get(session_id)
- logger.error("%s", repr(serialized))
- if serialized is None:
- raise SessionNotFoundException()
- if renew:
- r.expire(session_id, settings.expire_time)
- return json.loads(serialized)
- def delete_session(session_id: str):
- r = redis.Redis(**settings.redis_params)
- r.delete(session_id)
- # end session }}}
- # Models {{{
- class UserData(BaseModel):
- username: str
- class ChangeData(BaseModel):
- password: str
- class LoginData(UserData, ChangeData):
- pass
- class SuccessData(BaseModel):
- success: bool = True
- # end Models }}}
- # external commands {{{
- def validate(username, password):
- p = Popen(
- [settings.validate_login_exe],
- env={"VERIRY_USERNAME": username, "VERIFY_PASSWORD": password},
- )
- try:
- p.communicate()
- except CalledProcessError:
- return False
- return p.returncode == 0
- def password_generate() -> str:
- if settings.generate_password:
- s = check_output([settings.generate_password], encoding='utf8')
- assert type(s) is str
- return s.strip()
- else:
- symbols = list(string.ascii_lowercase) + list(string.digits)
- return "".join(random.choices(symbols, k=10))
- def change_password(username: str, new_password: str) -> bool:
- p = Popen(
- [settings.change_password_exe],
- env={"CHANGE_USERNAME": username, "CHANGE_PASSWORD": new_password},
- )
- try:
- p.communicate()
- except CalledProcessError:
- return False
- if p.returncode != 0:
- return False
- return True
- # end external commands }}}
- @router.get("/")
- async def home(request: Request, session_id: str = Cookie(None)):
- """redirects to the user to the home"""
- try:
- get_session(session_id)
- except SessionNotFoundException:
- return RedirectResponse(app.url_path_for("static", path="login.html"))
- return RedirectResponse(app.url_path_for("static", path="change.html"))
- @router.post("/login", tags=["session"])
- async def login(req: LoginData):
- """
- performs login
- """
- ok = validate(req.username, req.password)
- if not ok:
- raise HTTPException(status_code=401, detail="Authentication error")
- session_id = create_session(
- {
- "username": req.username,
- }
- )
- response = Response()
- response.set_cookie(key="session_id", value=session_id)
- return response
- @router.get("/whoami", tags=["session"])
- async def whoami(session_id: str = Cookie(None)):
- """Confirm login information"""
- session = get_session(session_id)
- return UserData(username=session["username"])
- @router.post("/logout", tags=["session"])
- async def logout(session_id: str = Cookie(None)) -> BaseModel:
- get_session(session_id)
- delete_session(session_id)
- return BaseModel()
- KDF_SALT_SIZE = 16
- def kdf_gen(password, salt=None) -> str:
- if salt is None:
- salt = random.randbytes(KDF_SALT_SIZE)
- if hasattr(password, 'encode'):
- password = password.encode('utf8')
- raw = hashlib.scrypt(password, n=2, r=1, p=1, salt=salt)
- with_salt = salt + raw
- return base64.b64encode(with_salt).decode('ascii')
- def kdf_get_salt(hashed: str):
- hashed_str = hashed.decode('ascii') if hasattr(hashed, 'decode') else hashed
- with_salt = base64.b64decode(hashed_str)
- salt = with_salt[:KDF_SALT_SIZE]
- return salt
- def kdf_verify(hashed: str, password: str) -> bool:
- salt = kdf_get_salt(hashed)
- hashed2 = kdf_gen(password, salt=salt)
- return hashed == hashed2
- @router.post("/generate", tags=["password"])
- async def generate(session_id: str = Cookie(None)):
- session = get_session(session_id)
- proposed_password = password_generate()
- session["proposed_password_hash"] = kdf_gen(proposed_password)
- set_session(session_id, session)
- return ChangeData(password=proposed_password)
- @router.post("/change", tags=["password"])
- async def change(req: ChangeData, session_id: str = Cookie(None)) -> SuccessData:
- session = get_session(session_id)
- if "proposed_password_hash" not in session:
- raise HTTPException(status_code=400, detail="You must generate it first")
- hashed = session["proposed_password_hash"]
- if not kdf_verify(hashed, req.password):
- raise HTTPException(status_code=409)
- delete_session(session_id)
- success = change_password(session["username"], req.password)
- return SuccessData(success=success)
- app.include_router(router)
- # vim: set fdm=marker:
|