#!/usr/bin/python3 import datetime import sys import re import json from base64 import b64encode from pathlib import Path from argparse import ArgumentParser import subprocess import tempfile import logging import random import contextlib import requests def setup_logging() -> Path: logdir = Path("~/.local/share/carichello/").expanduser() logdir.mkdir(exist_ok=True, parents=True) today = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M") logfile = tempfile.NamedTemporaryFile( dir=logdir, prefix=f"{today}-", suffix=".txt", delete=False ) logging.basicConfig( filename=logfile.name, filemode="a", format="%(asctime)s:%(levelname)s:%(message)s", datefmt="%Y-%m-%d_%H:%M:%S", level=logging.DEBUG, ) logfile.close() return Path(logfile.name) LOGFILE: Path = setup_logging() LOG = logging.getLogger() class RcloneBackend: def __init__(self, remote_name: str): """ remote_name should be something like :myarchivedirectory:, which you should have configured elsewhere """ self.remote_name = remote_name def __str__(self): return f"<{self.remote_name}>" def actual_path(self, destination_path: str): return destination_path def exists(self, destination_path: bytes): destination = b"%s:%s" % ( self.remote_name.encode("utf8"), self.actual_path(destination_path), ) cmd = ["rclone", "--quiet", "lsjson", destination] try: output = subprocess.check_output(cmd) except subprocess.CalledProcessError as exc: if exc.returncode == 3: return False raise data = json.loads(output) return bool(data) def reserve(self, filename: Path) -> None: return None def copy(self, filename: Path, destination_path: bytes): """ raises in case of errors """ destination = b"%s:%s" % (self.remote_name.encode("utf8"), destination_path) cmd = ["rclone", "--quiet", "copyto", str(filename), destination] subprocess.run(cmd, check=True) class ArchiveBackend: def __init__(self, accesskey: str, secret: str, bucketprefix: str = "upload"): self.accesskey = accesskey self.secret = secret self.bucketprefix = bucketprefix self.bucket = None # final available bucket to be used self.dl_url = None # final download URL def __str__(self): return "" @property def auth_headers(self): return {"authorization": f"LOW {self.accesskey}:{self.secret}"} def exists(self, destination_path: bytes): # XXX: we could check the *existence* of buckets through a GET, then check if it is our by looking at # the *_meta.xml file, which has an uploader field return False def reserve(self, filename: Path) -> str: bucketbase = re.sub( r"""[^a-zA-Z0-9_.-]""", # based on what the archive.org documentation says "_", filename.name.rsplit(".", maxsplit=1)[0], ) if not bucketbase.startswith(f"{self.bucketprefix}-"): bucketbase = f"{self.bucketprefix}-" + bucketbase bucketname = bucketbase attempts = 5 for attempt in range(attempts): LOG.debug("trying %s", bucketname) resp = requests.put( f"https://s3.us.archive.org/{bucketname}", headers=self.auth_headers, ) try: resp.raise_for_status() except requests.HTTPError as exc: if attempt < attempts - 1: bucketname = f"{bucketbase}-{random.randint(1000,9999)}" continue else: LOG.error("response was %s\n%s\n%s", resp, resp.headers, resp.text) raise ValueError("could not find a good bucket for ") from exc else: break LOG.info("Found good bucket: %s", bucketname) self.bucket = bucketname self.dl_url = f"https://archive.org/download/{self.bucket}/{filename.name}" return self.dl_url def copy(self, filename: Path, destination_path: bytes) -> str: """ destination_path is ignored """ upload_url = f"https://s3.us.archive.org/{self.bucket}/{filename.name}" # XXX: set some more header based on file metadata (date, title, etc.) headers = { "x-archive-meta01-collection": "opensource", "x-archive-meta-language": "ita", } with filename.open("rb") as buf: resp = requests.put( upload_url, data=buf, headers={**headers, **self.auth_headers}, ) resp.raise_for_status() LOG.info("loaded on %s", self.dl_url) return self.dl_url class ArkiwiBackend(RcloneBackend): def __init__(self, remote_name: str, prefix: str): super().__init__(remote_name) self.prefix: bytes = prefix.strip("/").encode("utf8") def __str__(self): return "" def ftp_path(self, path: bytes) -> bytes: return b"ftp://upload.arkiwi.org/%s/" % (path.strip(b"/"),) def actual_path(self, path: bytes) -> bytes: return self.prefix + b"/" + path.lstrip(b"/") def path_to_url(self, path: bytes) -> str: # this is rfc4648 section 5 path = ( b64encode(self.actual_path(path), altchars=b"-_") .rstrip(b"=") .decode("ascii") ) return f"https://www.arkiwi.org/path64/{path}/redirect" # 2024-11-05: cambio da rclone a curl per girare intorno ai bug di webdav. Poi bisogna togliere l'intero metodo, così torniamo a quello di RcloneBackend def exists(self, destination_path: bytes): cmd = ["curl", "--netrc", "--head", "--silent", self.ftp_path(destination_path)] try: output = subprocess.check_output(cmd) except subprocess.CalledProcessError as exc: if exc.returncode == 3: return False raise return bool(output) def copy(self, filename: Path, destination_path: bytes) -> str: """ returns the URL """ # 2024-11-05: siccome webdav è rotto e invece FTP funziona, sostituisco la copia webdav (rclone) con una fatta con curl # super().copy(filename, self.actual_path(destination_path)) cmd = [ "curl", "--netrc", "--upload-file", str(filename), self.ftp_path(destination_path), ] subprocess.run(cmd, check=True) url = self.path_to_url(destination_path) response = requests.head(url, allow_redirects=True) response.raise_for_status() length = int(response.headers["Content-Length"]) expected = filename.stat().st_size if length != expected: raise ValueError( "the uploaded file has a wrong size: %d instead of %d" % (length, expected) ) return url BACKENDS = { "arkiwi.org": ArkiwiBackend, "archive.org": ArchiveBackend, "default": ArkiwiBackend, } @contextlib.contextmanager def zenity_pulsate(args): proc = subprocess.Popen( ["zenity", "--auto-close", "--progress", "--pulsate", *args], universal_newlines=True, stdin=subprocess.PIPE, ) try: yield proc.stdin finally: proc.stdin.close() class Carichello: def __init__(self): self.parser = self.get_parser() def get_parser(self): p = ArgumentParser() p.add_argument( "--config", type=Path, default=Path("~/.config/carichello/config.json").expanduser(), ) p.add_argument("file", type=Path, nargs="?") return p def error(self) -> int: LOG.error("generic error") subprocess.run( ["zenity", "--error", "--title=Errore caricamento", "--text=Errore!"] ) return 1 def error_exception(self, exc: Exception) -> int: LOG.exception("error") subprocess.run( [ "zenity", "--error", "--title=Errore caricamento", f"--text=Errore!\n\n{exc}", ] ) return 1 def set_clipboard(self, text: str): subprocess.run(["xsel", "-bi"], input=text.encode("utf8")) def run(self) -> int: try: ret = self._run() except Exception: self._send_log(-1) raise else: if ret != 0: self._send_log(ret) return ret def send_mail(self, subject: str, text: str): mail_to = self.config.get("mail", {}).get("to", None) hostname = subprocess.check_output(["hostname", "-f"], encoding="utf8").strip() if mail_to: try: subprocess.run( ["mail", "-s", subject, mail_to], input=f"{hostname}\n\n{text}", encoding="utf8", ) return True except subprocess.CalledProcessError: LOG.error("error sending email") return False def _send_log(self, ret: int): contents = LOGFILE.read_text() self.send_mail("Carichello log ERROR", contents) def _run(self) -> int: LOG.info("start") self.args = self.parser.parse_args() with self.args.config.open() as buf: self.config = json.load(buf) backend_config = self.config["backends"][0] backend_config.setdefault("type", "default") BackendCls = BACKENDS[backend_config["type"]] if self.args.file is None: output = subprocess.check_output( [ "zenity", "--file-selection", "--file-filter=Audio files | *.ogg | *.oga | *.opus", "--title=Seleziona il file da caricare", ], text=True, ) if not output: return 1 self.args.file = Path(output.rstrip("\n")) if not self.args.file.exists(): subprocess.run( ["zenity", "--error", f"--text=Il file '{self.args.file}' non esiste"] ) return 1 now = datetime.datetime.now() dest_directory = f"/{now.year}/{now.month}" dest_file = f"{dest_directory}/{self.args.file.name}".encode("utf8") backend = BackendCls(**backend_config.get("config", {})) if hasattr(backend, "path_to_url"): url = backend.path_to_url(dest_file) LOG.info("file %s would be uploaded to %s", str(self.args.file), url) else: url = None LOG.info("file %s would be uploaded", str(self.args.file)) with zenity_pulsate( [ f"--title=Caricamento file su {backend}", f"--text=Verifiche file {self.args.file.name} in corso...", ] ) as zenity: try: exists = backend.exists(dest_file) except subprocess.CalledProcessError as exc: zenity.close() return self.error_exception(exc) if exists: zenity.close() subprocess.run( [ "zenity", "--info", f"--title=Caricamento su {backend}", f"--text=File {self.args.file.name} già presente:\n{url}", ] ) return 1 zenity.write(f"# Creazione item per {self.args.file.name}\n") zenity.flush() reserved_url = backend.reserve(self.args.file) if url is None: url = reserved_url else: assert url == reserved_url if url: self.set_clipboard(url) text = ( f"Caricamento su {url} in corso... Copia l'indirizzo da usare: 📋" ) else: text = f"Caricamento {self.args.file.name} in corso..." zenity.write(f"# {text}\n") zenity.flush() try: url = backend.copy(self.args.file, dest_file) except Exception as exc: zenity.close() return self.error_exception(exc) LOG.info("ready: %s", url) self.set_clipboard(url) self.send_mail("Nuovo file caricato", url) subprocess.run( [ "zenity", "--info", f"--text=Il file {self.args.file.name} è stato caricato:\n\n{url}", ] ) return 0 if __name__ == "__main__": sys.exit(Carichello().run())