from pathlib import Path import string import random import logging import json import uuid from subprocess import Popen, CalledProcessError from typing import Optional import redis from fastapi import FastAPI, APIRouter, HTTPException, Cookie, Request from fastapi.responses import Response, JSONResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, BaseSettings code_dir = Path(".") class Settings(BaseSettings): app_name: str = "tResetter" validate_login_exe: str change_password_exe: str redis_host: str = "localhost" expire_time: int = 60 * 20 root_path: Optional[str] root_prefix: str = "/tresetter" @property def redis_params(self): return {"host": self.redis_host} settings = Settings() kwargs = {} if settings.root_path: kwargs["root_path"] = settings.root_path app = FastAPI(**kwargs) router = APIRouter(prefix=settings.root_prefix) app.mount( settings.root_prefix + "/s", StaticFiles(directory=str(code_dir / "static")), name="static", ) logger = logging.getLogger("server") # Session {{{ class SessionNotFoundException(HTTPException): def __init__(self): super().__init__(status_code=401) def create_session(content) -> str: session_id = str(uuid.uuid4()) set_session(session_id, content) return session_id def set_session(session_id: str, content) -> bool: r = redis.Redis(**settings.redis_params) serialized = json.dumps(content) ret = r.set(session_id, serialized) r.expire(session_id, settings.expire_time) def get_session(session_id: str, renew=True): if session_id is None: raise SessionNotFoundException() r = redis.Redis(**settings.redis_params) serialized = r.get(session_id) logger.error("%s", repr(serialized)) if serialized is None: raise SessionNotFoundException() if renew: r.expire(session_id, settings.expire_time) return json.loads(serialized) def delete_session(session_id: str): r = redis.Redis(**settings.redis_params) r.delete(session_id) # end session }}} # Models {{{ class UserData(BaseModel): username: str class ChangeData(BaseModel): password: str class LoginData(UserData, ChangeData): pass class SuccessData(BaseModel): success: bool = True # end Models }}} # external commands {{{ def validate(username, password): p = Popen( [settings.validate_login_exe], env={"VERIRY_USERNAME": username, "VERIFY_PASSWORD": password}, ) try: p.communicate() except CalledProcessError: return False return p.returncode == 0 def password_generate(): symbols = list(string.ascii_lowercase) + list(string.digits) return "".join(random.choices(symbols, k=10)) def change_password(username: str, new_password: str) -> bool: p = Popen( [settings.change_password_exe], env={"CHANGE_USERNAME": username, "CHANGE_PASSWORD": new_password}, ) try: p.communicate() except CalledProcessError: return False if p.returncode != 0: return False return True # end external commands }}} @router.get("/") async def home(request: Request, session_id: str = Cookie(None)): """redirects to the user to the home""" try: get_session(session_id) except SessionNotFoundException: return RedirectResponse(app.url_path_for("static", path="login.html")) return RedirectResponse(app.url_path_for("static", path="change.html")) @router.post("/login", tags=["session"]) async def login(req: LoginData): """ performs login """ ok = validate(req.username, req.password) if not ok: raise HTTPException(status_code=401, detail="Authentication error") session_id = create_session( { "username": req.username, } ) response = Response() response.set_cookie(key="session_id", value=session_id) return response @router.get("/whoami", tags=["session"]) async def whoami(session_id: str = Cookie(None)): """Confirm login information""" session = get_session(session_id) return UserData(username=session["username"]) @router.post("/logout", tags=["session"]) async def logout(session_id: str = Cookie(None)) -> BaseModel: get_session(session_id) delete_session(session_id) return BaseModel() @router.post("/generate", tags=["password"]) async def generate(session_id: str = Cookie(None)): session = get_session(session_id) session["proposed_password"] = password_generate() set_session(session_id, session) return ChangeData(password=session["proposed_password"]) @router.post("/change", tags=["password"]) async def change(req: ChangeData, session_id: str = Cookie(None)) -> SuccessData: session = get_session(session_id) if "proposed_password" not in session: raise HTTPException(status_code=400, detail="You must generate it first") if req.password != session["proposed_password"]: raise HTTPException(status_code=409) success = change_password(session["username"], req.password) return SuccessData(success=success) app.include_router(router) # vim: set fdm=marker: