commit cb10765ff0b482357f5af5daf19ea3ae2d69b66f Author: boyska Date: Fri Mar 18 00:35:54 2022 +0100 initial diff --git a/tresetter.py b/tresetter.py new file mode 100644 index 0000000..11b0d84 --- /dev/null +++ b/tresetter.py @@ -0,0 +1,173 @@ +from pathlib import Path +import string +import random +import logging +import json +import uuid +from subprocess import Popen, CalledProcessError + +import redis +from fastapi import FastAPI, HTTPException, Cookie +from fastapi.responses import JSONResponse +from fastapi.staticfiles import StaticFiles +from pydantic import BaseModel, BaseSettings + + +code_dir = Path(".") +redis_params = { + "host": "localhost", + # "password": "foobar", +} + + +class Settings(BaseSettings): + app_name: str = "tResetter" + validate_login_exe: str + change_password_exe: str + redis_params: dict = { + "host": "localhost" + } + expire_time: int = 60 * 20 + + +settings = Settings() +app = FastAPI() + +app.mount("/static", StaticFiles(directory=str(code_dir / "static"))) + +logger = logging.getLogger("server") + + +def create_session(content) -> str: + session_id = str(uuid.uuid4()) + set_session(session_id, content) + return session_id + + +def set_session(session_id: str, content) -> bool: + r = redis.Redis(**redis_params) + serialized = json.dumps(content) + ret = r.set(session_id, serialized) + r.expire(session_id, settings.expire_time) + + +def get_session(session_id: str, renew=True): + if session_id is None: + raise HTTPException(status_code=400) + r = redis.Redis(**redis_params) + serialized = r.get(session_id) + logger.error("%s", repr(serialized)) + if serialized is None: + raise HTTPException(status_code=401) + if renew: + r.expire(session_id, settings.expire_time) + return json.loads(serialized) + + +def delete_session(session_id: str): + r = redis.Redis(**redis_params) + r.delete(session_id) + + +class LoginData(BaseModel): + username: str + password: str + + +class ChangeData(BaseModel): + password: str + +class SuccessData(BaseModel): + success: bool = True + + +@app.get("/") +async def home(): + # XXX: read index.html + return "Ciao!" + + +def validate(username, password): + p = Popen( + [settings.validate_login_exe], + env={"VERIRY_USERNAME": username, "VERIFY_PASSWORD": password}, + ) + try: + p.communicate() + except CalledProcessError: + return False + + return p.returncode == 0 + + +@app.post("/login") +async def login(req: LoginData): + + ok = validate(req.username, req.password) + if not ok: + raise HTTPException(status_code=401, detail="Authentication error") + session_id = create_session( + { + "username": req.username, + } + ) + response = JSONResponse( + content={ + "status": "ok", + "username": req.username, + } + ) + response.set_cookie(key="session_id", value=session_id) + return response + + +@app.get("/whoami") +async def whoami(session_id: str = Cookie(None)): + session = get_session(session_id) + return JSONResponse(content={"username": session["username"]}) + + +@app.post("/logout") +async def logout(session_id: str = Cookie(None)): + get_session(session_id) + delete_session(session_id) + return "OKI" + + +def password_generate(): + symbols = list(string.ascii_lowercase) + list(string.digits) + return "".join(random.choices(symbols, k=10)) + + +@app.post("/generate") +async def generate(session_id: str = Cookie(None)): + session = get_session(session_id) + session["proposed_password"] = password_generate() + set_session(session_id, session) + + return JSONResponse(content={"password": session["proposed_password"]}) + + +@app.post("/change") +async def change(req: ChangeData, session_id: str = Cookie(None)): + session = get_session(session_id) + if "proposed_password" not in session: + raise HTTPException(status_code=400, detail="You must generate it first") + if req.password != session["proposed_password"]: + raise HTTPException(status_code=409) + + p = Popen( + [settings.change_password_exe], + env={"CHANGE_USERNAME": session["username"], "CHANGE_PASSWORD": req.password}, + ) + + try: + p.communicate() + except CalledProcessError: + fail = True + else: + fail = p.returncode != 0 + if fail: + return SuccessData(success=False) + + return SuccessData()