tresetter.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229
  1. from pathlib import Path
  2. import string
  3. import random
  4. import logging
  5. import json
  6. import uuid
  7. from subprocess import Popen, CalledProcessError, check_output
  8. from typing import Optional
  9. import redis
  10. from fastapi import FastAPI, APIRouter, HTTPException, Cookie, Request
  11. from fastapi.responses import Response, JSONResponse, RedirectResponse
  12. from fastapi.staticfiles import StaticFiles
  13. from pydantic import BaseModel, BaseSettings
  14. code_dir = Path(".")
  15. class Settings(BaseSettings):
  16. app_name: str = "tResetter"
  17. validate_login_exe: str
  18. change_password_exe: str
  19. generate_password: Optional[str]
  20. redis_host: str = "localhost"
  21. expire_time: int = 60 * 20
  22. root_path: Optional[str]
  23. root_prefix: str = "/tresetter"
  24. class Config:
  25. pass
  26. @property
  27. def redis_params(self):
  28. return {"host": self.redis_host}
  29. settings = Settings()
  30. kwargs = {}
  31. if settings.root_path:
  32. kwargs["root_path"] = settings.root_path
  33. app = FastAPI(**kwargs)
  34. router = APIRouter(prefix=settings.root_prefix)
  35. app.mount(
  36. settings.root_prefix + "/s",
  37. StaticFiles(directory=str(code_dir / "static")),
  38. name="static",
  39. )
  40. logger = logging.getLogger("server")
  41. # Session {{{
  42. class SessionNotFoundException(HTTPException):
  43. def __init__(self):
  44. super().__init__(status_code=401)
  45. def create_session(content) -> str:
  46. session_id = str(uuid.uuid4())
  47. set_session(session_id, content)
  48. return session_id
  49. def set_session(session_id: str, content) -> bool:
  50. r = redis.Redis(**settings.redis_params)
  51. serialized = json.dumps(content)
  52. ret = r.set(session_id, serialized)
  53. r.expire(session_id, settings.expire_time)
  54. def get_session(session_id: str, renew=True):
  55. if session_id is None:
  56. raise SessionNotFoundException()
  57. r = redis.Redis(**settings.redis_params)
  58. serialized = r.get(session_id)
  59. logger.error("%s", repr(serialized))
  60. if serialized is None:
  61. raise SessionNotFoundException()
  62. if renew:
  63. r.expire(session_id, settings.expire_time)
  64. return json.loads(serialized)
  65. def delete_session(session_id: str):
  66. r = redis.Redis(**settings.redis_params)
  67. r.delete(session_id)
  68. # end session }}}
  69. # Models {{{
  70. class UserData(BaseModel):
  71. username: str
  72. class ChangeData(BaseModel):
  73. password: str
  74. class LoginData(UserData, ChangeData):
  75. pass
  76. class SuccessData(BaseModel):
  77. success: bool = True
  78. # end Models }}}
  79. # external commands {{{
  80. def validate(username, password):
  81. p = Popen(
  82. [settings.validate_login_exe],
  83. env={"VERIRY_USERNAME": username, "VERIFY_PASSWORD": password},
  84. )
  85. try:
  86. p.communicate()
  87. except CalledProcessError:
  88. return False
  89. return p.returncode == 0
  90. def password_generate() -> str:
  91. if settings.generate_password:
  92. s = check_output([settings.generate_password], encoding='utf8')
  93. assert type(s) is str
  94. return s.strip()
  95. else:
  96. symbols = list(string.ascii_lowercase) + list(string.digits)
  97. return "".join(random.choices(symbols, k=10))
  98. def change_password(username: str, new_password: str) -> bool:
  99. p = Popen(
  100. [settings.change_password_exe],
  101. env={"CHANGE_USERNAME": username, "CHANGE_PASSWORD": new_password},
  102. )
  103. try:
  104. p.communicate()
  105. except CalledProcessError:
  106. return False
  107. if p.returncode != 0:
  108. return False
  109. return True
  110. # end external commands }}}
  111. @router.get("/")
  112. async def home(request: Request, session_id: str = Cookie(None)):
  113. """redirects to the user to the home"""
  114. try:
  115. get_session(session_id)
  116. except SessionNotFoundException:
  117. return RedirectResponse(app.url_path_for("static", path="login.html"))
  118. return RedirectResponse(app.url_path_for("static", path="change.html"))
  119. @router.post("/login", tags=["session"])
  120. async def login(req: LoginData):
  121. """
  122. performs login
  123. """
  124. ok = validate(req.username, req.password)
  125. if not ok:
  126. raise HTTPException(status_code=401, detail="Authentication error")
  127. session_id = create_session(
  128. {
  129. "username": req.username,
  130. }
  131. )
  132. response = Response()
  133. response.set_cookie(key="session_id", value=session_id)
  134. return response
  135. @router.get("/whoami", tags=["session"])
  136. async def whoami(session_id: str = Cookie(None)):
  137. """Confirm login information"""
  138. session = get_session(session_id)
  139. return UserData(username=session["username"])
  140. @router.post("/logout", tags=["session"])
  141. async def logout(session_id: str = Cookie(None)) -> BaseModel:
  142. get_session(session_id)
  143. delete_session(session_id)
  144. return BaseModel()
  145. @router.post("/generate", tags=["password"])
  146. async def generate(session_id: str = Cookie(None)):
  147. session = get_session(session_id)
  148. session["proposed_password"] = password_generate()
  149. set_session(session_id, session)
  150. return ChangeData(password=session["proposed_password"])
  151. @router.post("/change", tags=["password"])
  152. async def change(req: ChangeData, session_id: str = Cookie(None)) -> SuccessData:
  153. session = get_session(session_id)
  154. if "proposed_password" not in session:
  155. raise HTTPException(status_code=400, detail="You must generate it first")
  156. if req.password != session["proposed_password"]:
  157. raise HTTPException(status_code=409)
  158. success = change_password(session["username"], req.password)
  159. return SuccessData(success=success)
  160. app.include_router(router)
  161. # vim: set fdm=marker: