tresetter/tresetter.py

220 lines
5.1 KiB
Python
Raw Normal View History

2022-03-18 00:35:54 +01:00
from pathlib import Path
import string
import random
import logging
import json
import uuid
from subprocess import Popen, CalledProcessError
2022-03-22 13:07:03 +01:00
from typing import Optional
2022-03-18 00:35:54 +01:00
import redis
2022-03-22 13:07:03 +01:00
from fastapi import FastAPI, APIRouter, HTTPException, Cookie, Request
2022-03-22 19:05:55 +01:00
from fastapi.responses import Response, JSONResponse, RedirectResponse
2022-03-18 00:35:54 +01:00
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, BaseSettings
code_dir = Path(".")
class Settings(BaseSettings):
app_name: str = "tResetter"
validate_login_exe: str
change_password_exe: str
2022-03-22 13:07:03 +01:00
redis_host: str = "localhost"
2022-03-18 00:35:54 +01:00
expire_time: int = 60 * 20
2022-03-22 13:07:03 +01:00
root_path: Optional[str]
root_prefix: str = "/tresetter"
@property
def redis_params(self):
return {"host": self.redis_host}
2022-03-18 00:35:54 +01:00
settings = Settings()
2022-03-22 13:07:03 +01:00
kwargs = {}
if settings.root_path:
kwargs["root_path"] = settings.root_path
app = FastAPI(**kwargs)
router = APIRouter(prefix=settings.root_prefix)
2022-03-18 00:35:54 +01:00
2022-03-22 13:07:03 +01:00
app.mount(
settings.root_prefix + "/s",
StaticFiles(directory=str(code_dir / "static")),
name="static",
)
2022-03-18 00:35:54 +01:00
logger = logging.getLogger("server")
2022-03-22 13:07:03 +01:00
# Session {{{
class SessionNotFoundException(HTTPException):
def __init__(self):
super().__init__(status_code=401)
2022-03-18 00:35:54 +01:00
def create_session(content) -> str:
session_id = str(uuid.uuid4())
set_session(session_id, content)
return session_id
def set_session(session_id: str, content) -> bool:
2022-03-22 13:07:03 +01:00
r = redis.Redis(**settings.redis_params)
2022-03-18 00:35:54 +01:00
serialized = json.dumps(content)
ret = r.set(session_id, serialized)
r.expire(session_id, settings.expire_time)
def get_session(session_id: str, renew=True):
if session_id is None:
2022-03-22 13:07:03 +01:00
raise SessionNotFoundException()
r = redis.Redis(**settings.redis_params)
2022-03-18 00:35:54 +01:00
serialized = r.get(session_id)
logger.error("%s", repr(serialized))
if serialized is None:
2022-03-22 13:07:03 +01:00
raise SessionNotFoundException()
2022-03-18 00:35:54 +01:00
if renew:
r.expire(session_id, settings.expire_time)
return json.loads(serialized)
def delete_session(session_id: str):
2022-03-22 13:07:03 +01:00
r = redis.Redis(**settings.redis_params)
2022-03-18 00:35:54 +01:00
r.delete(session_id)
2022-03-22 13:07:03 +01:00
# end session }}}
# Models {{{
2022-03-22 19:05:55 +01:00
class UserData(BaseModel):
2022-03-18 00:35:54 +01:00
username: str
class ChangeData(BaseModel):
password: str
2022-03-22 13:07:03 +01:00
2022-03-22 19:05:55 +01:00
class LoginData(UserData, ChangeData):
pass
2022-03-18 00:35:54 +01:00
class SuccessData(BaseModel):
success: bool = True
2022-03-22 13:07:03 +01:00
# end Models }}}
2022-03-18 00:35:54 +01:00
2022-03-22 13:07:03 +01:00
# external commands {{{
2022-03-18 00:35:54 +01:00
def validate(username, password):
p = Popen(
[settings.validate_login_exe],
env={"VERIRY_USERNAME": username, "VERIFY_PASSWORD": password},
)
try:
p.communicate()
except CalledProcessError:
return False
return p.returncode == 0
2022-03-22 13:07:03 +01:00
def password_generate():
symbols = list(string.ascii_lowercase) + list(string.digits)
return "".join(random.choices(symbols, k=10))
2022-03-22 19:05:55 +01:00
def change_password(username: str, new_password: str) -> bool:
p = Popen(
[settings.change_password_exe],
env={"CHANGE_USERNAME": username, "CHANGE_PASSWORD": new_password},
)
try:
p.communicate()
except CalledProcessError:
return False
if p.returncode != 0:
return False
return True
2022-03-22 13:07:03 +01:00
# end external commands }}}
@router.get("/")
async def home(request: Request, session_id: str = Cookie(None)):
2022-03-22 19:05:55 +01:00
"""redirects to the user to the home"""
2022-03-22 13:07:03 +01:00
try:
get_session(session_id)
except SessionNotFoundException:
return RedirectResponse(app.url_path_for("static", path="login.html"))
return RedirectResponse(app.url_path_for("static", path="change.html"))
@router.post("/login", tags=["session"])
2022-03-18 00:35:54 +01:00
async def login(req: LoginData):
2022-03-22 19:05:55 +01:00
"""
performs login
"""
2022-03-18 00:35:54 +01:00
ok = validate(req.username, req.password)
if not ok:
raise HTTPException(status_code=401, detail="Authentication error")
session_id = create_session(
{
"username": req.username,
}
)
2022-03-22 19:05:55 +01:00
response = Response()
2022-03-18 00:35:54 +01:00
response.set_cookie(key="session_id", value=session_id)
return response
2022-03-22 13:07:03 +01:00
@router.get("/whoami", tags=["session"])
2022-03-18 00:35:54 +01:00
async def whoami(session_id: str = Cookie(None)):
2022-03-22 19:05:55 +01:00
"""Confirm login information"""
2022-03-18 00:35:54 +01:00
session = get_session(session_id)
2022-03-22 19:05:55 +01:00
return UserData(username=session["username"])
2022-03-18 00:35:54 +01:00
2022-03-22 13:07:03 +01:00
@router.post("/logout", tags=["session"])
2022-03-22 19:05:55 +01:00
async def logout(session_id: str = Cookie(None)) -> BaseModel:
2022-03-18 00:35:54 +01:00
get_session(session_id)
delete_session(session_id)
2022-03-22 19:05:55 +01:00
return BaseModel()
2022-03-18 00:35:54 +01:00
2022-03-22 13:07:03 +01:00
@router.post("/generate", tags=["password"])
2022-03-18 00:35:54 +01:00
async def generate(session_id: str = Cookie(None)):
session = get_session(session_id)
session["proposed_password"] = password_generate()
set_session(session_id, session)
2022-03-22 19:05:55 +01:00
return ChangeData(password=session["proposed_password"])
2022-03-18 00:35:54 +01:00
2022-03-22 13:07:03 +01:00
@router.post("/change", tags=["password"])
2022-03-22 19:05:55 +01:00
async def change(req: ChangeData, session_id: str = Cookie(None)) -> SuccessData:
2022-03-18 00:35:54 +01:00
session = get_session(session_id)
if "proposed_password" not in session:
raise HTTPException(status_code=400, detail="You must generate it first")
if req.password != session["proposed_password"]:
raise HTTPException(status_code=409)
2022-03-22 19:05:55 +01:00
success = change_password(session["username"], req.password)
return SuccessData(success=success)
2022-03-22 13:07:03 +01:00
app.include_router(router)
# vim: set fdm=marker: