carichello/carichello.py
2024-11-05 11:11:11 +01:00

270 lines
8.7 KiB
Python

#!/usr/bin/python3
import datetime
import sys
import json
from base64 import b64encode
from pathlib import Path
from argparse import ArgumentParser
from subprocess import Popen, PIPE
import subprocess
import tempfile
import logging
import requests
def setup_logging():
logdir = Path("~/.local/share/carichello/").expanduser()
logdir.mkdir(exist_ok=True, parents=True)
today = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M")
logfile = tempfile.NamedTemporaryFile(
dir=logdir, prefix=f"{today}-", suffix=".txt", delete=False
)
logging.basicConfig(filename=logfile.name, filemode="a", level=logging.DEBUG)
logfile.close()
setup_logging()
LOG = logging.getLogger()
class RcloneBackend:
def __init__(self, remote_name: str):
"""
remote_name should be something like :myarchivedirectory:, which you should have configured elsewhere
"""
self.remote_name = remote_name
def actual_path(self, destination_path: str):
return destination_path
def exists(self, destination_path: bytes):
destination = b"%s:%s" % (
self.remote_name.encode("utf8"),
self.actual_path(destination_path),
)
cmd = ["rclone", "--quiet", "lsjson", destination]
try:
output = subprocess.check_output(cmd)
except subprocess.CalledProcessError as exc:
if exc.returncode == 3:
return False
raise
data = json.loads(output)
return bool(data)
def copy(self, filename: Path, destination_path: bytes):
"""
raises in case of errors
"""
destination = b"%s:%s" % (self.remote_name.encode("utf8"), destination_path)
cmd = ["rclone", "--quiet", "copyto", str(filename), destination]
subprocess.run(cmd, check=True)
class ArchiveBackend(RcloneBackend):
# XXX: there should be a way to "reserve" a bucket
# with that, we could now the URL before copying
# XXX: use x-archive-meta$field header to set metadata based on file contents, see
# https://github.com/vmbrasseur/IAS3API/blob/master/metadata.md
# they can be set using --header-upload, see https://rclone.org/docs/#header-upload
pass
class ArkiwiBackend(RcloneBackend):
def __init__(self, remote_name: str, prefix: str):
super().__init__(remote_name)
self.prefix: bytes = prefix.strip("/").encode("utf8")
def ftp_path(self, path: bytes) -> bytes:
return b"ftp://upload.arkiwi.org/%s/" % (destination_path.strip(b"/"),))
def actual_path(self, path: bytes) -> bytes:
return self.prefix + b"/" + path.lstrip(b"/")
def path_to_url(self, path: bytes) -> str:
# this is rfc4648 section 5
path = (
b64encode(self.actual_path(path), altchars=b"-_")
.rstrip(b"=")
.decode("ascii")
)
return f"https://www.arkiwi.org/path64/{path}/redirect"
#2024-11-05: cambio da rclone a curl per girare intorno ai bug di webdav. Poi bisogna togliere l'intero metodo, così torniamo a quello di RcloneBackend
def exists(self, destination_path: bytes):
cmd = ["curl", "--netrc", "--head", "--silent", self.ftp_path(destination_path)]
try:
output = subprocess.check_output(cmd)
except subprocess.CalledProcessError as exc:
if exc.returncode == 3:
return False
raise
return bool(output)
def copy(self, filename: Path, destination_path: bytes) -> str:
"""
returns the URL
"""
# 2024-11-05: siccome webdav è rotto e invece FTP funziona, sostituisco la copia webdav (rclone) con una fatta con curl
# super().copy(filename, self.actual_path(destination_path))
cmd = ["curl", "--netrc", "--upload-file", str(filename), self.ftp_path(destination_path)]
subprocess.run(cmd, check=True)
url = self.path_to_url(destination_path)
response = requests.head(url, allow_redirects=True)
response.raise_for_status()
length = int(response.headers["Content-Length"])
expected = filename.stat().st_size
if length != expected:
raise ValueError(
"the uploaded file has a wrong size: %d instead of %d"
% (length, expected)
)
return url
BACKENDS = {
"arkiwi.org": ArkiwiBackend,
"archive.org": ArchiveBackend,
"default": ArkiwiBackend,
}
class Carichello:
def __init__(self):
self.parser = self.get_parser()
def get_parser(self):
p = ArgumentParser()
p.add_argument(
"--config",
type=Path,
default=Path("~/.config/carichello/config.json").expanduser(),
)
p.add_argument("file", type=Path, nargs='?')
return p
def error(self) -> int:
LOG.error("generic error")
subprocess.run(
["zenity", "--error", "--title=Errore caricamento", "--text=Errore!"]
)
return 1
def error_exception(self, exc: Exception) -> int:
LOG.exception("error")
subprocess.run(
[
"zenity",
"--error",
"--title=Errore caricamento",
f"--text=Errore!\n\n<tt>{exc}</tt>",
]
)
return 1
def set_clipboard(self, text: str):
subprocess.run(["xsel", "-bi"], input=text.encode("utf8"))
def run(self) -> int:
LOG.info("start")
self.args = self.parser.parse_args()
with self.args.config.open() as buf:
self.config = json.load(buf)
self.config.setdefault("type", "default")
BackendCls = BACKENDS[self.config["type"]]
if self.args.file is None:
output = subprocess.check_output(
[
"zenity",
"--file-selection",
"--file-filter=Audio files | *.ogg | *.oga | *.opus",
"--title=Seleziona il file da caricare",
],
)
if not output:
return 1
self.args.file = Path(output)
if not self.args.file.exists():
subprocess.run([
"zenity", "--error", "--text=Il file selezionato non esiste"
])
return 1
now = datetime.datetime.now()
dest_directory = f"/{now.year}/{now.month}"
dest_file = f"{dest_directory}/{self.args.file.name}".encode("utf8")
if (
BackendCls == ArkiwiBackend
): # XXX: reimplementa in modo che questo if non sia necessario
backend = BackendCls(self.config["remote"], self.config["prefix"])
else:
backend = BackendCls(self.config["remote"])
if hasattr(backend, "path_to_url"):
url = backend.path_to_url(dest_file)
LOG.info("file %s would be uploaded to %s", str(self.args.file), url)
else:
url = None
LOG.info("file %s would be uploaded", str(self.args.file))
zenity = Popen(
[
"zenity",
"--auto-close",
"--progress",
"--pulsate",
"--title=Caricamento su Arkiwi",
f"--text=Verifiche file {self.args.file.name} in corso...",
],
stdin=PIPE,
)
try:
exists = backend.exists(dest_file)
zenity.stdin.close()
if exists:
subprocess.run(
[
"zenity",
"--info",
"--title=Caricamento su Arkiwi",
f"--text=File {self.args.file.name} già presente:\n{url}",
]
)
return 1
except subprocess.CalledProcessError as exc:
zenity.stdin.close()
return self.error_exception(exc)
self.set_clipboard(url)
zenity = Popen(
[
"zenity",
"--auto-close",
"--progress",
"--pulsate",
f"--text=Caricamento file {self.args.file.name} su arkiwi in corso...\n Copia l'indirizzo da usare <a href='#'>📋 </a>",
],
stdin=PIPE,
)
try:
url = backend.copy(self.args.file, dest_file)
except Exception as exc:
return self.error_exception(exc)
finally:
zenity.stdin.close()
LOG.info("ready: %s", url)
subprocess.run(
[
"zenity",
"--info",
f"--text=Il file {self.args.file.name} è stato caricato:\n\n{url}",
]
)
return 0
if __name__ == "__main__":
sys.exit(Carichello().run())