from pathlib import Path import string import random import logging import json import uuid from subprocess import Popen, CalledProcessError import redis from fastapi import FastAPI, HTTPException, Cookie from fastapi.responses import JSONResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, BaseSettings code_dir = Path(".") redis_params = { "host": "localhost", # "password": "foobar", } class Settings(BaseSettings): app_name: str = "tResetter" validate_login_exe: str change_password_exe: str redis_params: dict = { "host": "localhost" } expire_time: int = 60 * 20 settings = Settings() app = FastAPI() app.mount("/static", StaticFiles(directory=str(code_dir / "static"))) logger = logging.getLogger("server") def create_session(content) -> str: session_id = str(uuid.uuid4()) set_session(session_id, content) return session_id def set_session(session_id: str, content) -> bool: r = redis.Redis(**redis_params) serialized = json.dumps(content) ret = r.set(session_id, serialized) r.expire(session_id, settings.expire_time) def get_session(session_id: str, renew=True): if session_id is None: raise HTTPException(status_code=400) r = redis.Redis(**redis_params) serialized = r.get(session_id) logger.error("%s", repr(serialized)) if serialized is None: raise HTTPException(status_code=401) if renew: r.expire(session_id, settings.expire_time) return json.loads(serialized) def delete_session(session_id: str): r = redis.Redis(**redis_params) r.delete(session_id) class LoginData(BaseModel): username: str password: str class ChangeData(BaseModel): password: str class SuccessData(BaseModel): success: bool = True @app.get("/") async def home(): # XXX: read index.html return "Ciao!" def validate(username, password): p = Popen( [settings.validate_login_exe], env={"VERIRY_USERNAME": username, "VERIFY_PASSWORD": password}, ) try: p.communicate() except CalledProcessError: return False return p.returncode == 0 @app.post("/login") async def login(req: LoginData): ok = validate(req.username, req.password) if not ok: raise HTTPException(status_code=401, detail="Authentication error") session_id = create_session( { "username": req.username, } ) response = JSONResponse( content={ "status": "ok", "username": req.username, } ) response.set_cookie(key="session_id", value=session_id) return response @app.get("/whoami") async def whoami(session_id: str = Cookie(None)): session = get_session(session_id) return JSONResponse(content={"username": session["username"]}) @app.post("/logout") async def logout(session_id: str = Cookie(None)): get_session(session_id) delete_session(session_id) return "OKI" def password_generate(): symbols = list(string.ascii_lowercase) + list(string.digits) return "".join(random.choices(symbols, k=10)) @app.post("/generate") async def generate(session_id: str = Cookie(None)): session = get_session(session_id) session["proposed_password"] = password_generate() set_session(session_id, session) return JSONResponse(content={"password": session["proposed_password"]}) @app.post("/change") async def change(req: ChangeData, session_id: str = Cookie(None)): session = get_session(session_id) if "proposed_password" not in session: raise HTTPException(status_code=400, detail="You must generate it first") if req.password != session["proposed_password"]: raise HTTPException(status_code=409) p = Popen( [settings.change_password_exe], env={"CHANGE_USERNAME": session["username"], "CHANGE_PASSWORD": req.password}, ) try: p.communicate() except CalledProcessError: fail = True else: fail = p.returncode != 0 if fail: return SuccessData(success=False) return SuccessData()