carichello/carichello.py

367 lines
12 KiB
Python
Raw Normal View History

2024-10-22 00:51:25 +02:00
#!/usr/bin/python3
import datetime
import sys
import json
from base64 import b64encode
from pathlib import Path
from argparse import ArgumentParser
from subprocess import Popen, PIPE
import subprocess
import tempfile
import logging
2024-11-17 17:44:23 +01:00
import random
import contextlib
2024-10-22 00:51:25 +02:00
import requests
def setup_logging():
logdir = Path("~/.local/share/carichello/").expanduser()
logdir.mkdir(exist_ok=True, parents=True)
today = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M")
logfile = tempfile.NamedTemporaryFile(
dir=logdir, prefix=f"{today}-", suffix=".txt", delete=False
)
2024-11-20 17:20:50 +01:00
logging.basicConfig(
filename=logfile.name,
filemode="a",
format="%(asctime)s:%(levelname)s:%(message)s",
datefmt="%Y-%m-%d_%H:%M:%S",
level=logging.DEBUG,
)
2024-10-22 00:51:25 +02:00
logfile.close()
setup_logging()
LOG = logging.getLogger()
class RcloneBackend:
def __init__(self, remote_name: str):
"""
remote_name should be something like :myarchivedirectory:, which you should have configured elsewhere
"""
self.remote_name = remote_name
def __str__(self):
return f"<{self.remote_name}>"
2024-10-22 13:11:47 +02:00
def actual_path(self, destination_path: str):
return destination_path
2024-10-22 00:51:25 +02:00
def exists(self, destination_path: bytes):
2024-10-22 13:11:47 +02:00
destination = b"%s:%s" % (
self.remote_name.encode("utf8"),
self.actual_path(destination_path),
)
2024-10-22 00:51:25 +02:00
cmd = ["rclone", "--quiet", "lsjson", destination]
try:
output = subprocess.check_output(cmd)
except subprocess.CalledProcessError as exc:
if exc.returncode == 3:
return False
raise
data = json.loads(output)
return bool(data)
2024-11-17 17:44:23 +01:00
def reserve(self, filename: Path) -> None:
return None
2024-10-22 00:51:25 +02:00
def copy(self, filename: Path, destination_path: bytes):
"""
raises in case of errors
"""
destination = b"%s:%s" % (self.remote_name.encode("utf8"), destination_path)
cmd = ["rclone", "--quiet", "copyto", str(filename), destination]
subprocess.run(cmd, check=True)
2024-11-17 17:44:23 +01:00
class ArchiveBackend:
def __init__(self, accesskey: str, secret: str, bucketprefix: str = "upload"):
2024-11-17 00:11:49 +01:00
self.accesskey = accesskey
self.secret = secret
2024-11-17 00:15:12 +01:00
self.bucketprefix = bucketprefix
2024-11-17 17:44:23 +01:00
self.bucket = None # final available bucket to be used
self.dl_url = None # final download URL
def __str__(self):
2024-11-20 17:22:05 +01:00
return "<archive.org>"
2024-11-17 17:44:23 +01:00
@property
def auth_headers(self):
return {"authorization": f"LOW {self.accesskey}:{self.secret}"}
2024-11-17 00:11:49 +01:00
def exists(self, destination_path: bytes):
2024-11-17 17:44:23 +01:00
# XXX: we could check the *existence* of buckets through a GET, then check if it is our by looking at
# the *_meta.xml file, which has an uploader field
2024-11-17 00:11:49 +01:00
return False
2024-11-17 17:44:23 +01:00
def reserve(self, filename: Path) -> str:
2024-11-17 00:11:49 +01:00
bucketbase = filename.name.rsplit(".", maxsplit=1)[0].replace(".", "_")
2024-11-17 00:15:12 +01:00
if not bucketbase.startswith(f"{self.bucketprefix}-"):
bucketbase = f"{self.bucketprefix}-" + bucketbase
2024-11-17 00:11:49 +01:00
bucketname = bucketbase
2024-11-17 17:44:23 +01:00
attempts = 5
for attempt in range(attempts):
2024-11-17 00:28:42 +01:00
LOG.debug("trying %s", bucketname)
2024-11-17 00:11:49 +01:00
resp = requests.put(
f"https://s3.us.archive.org/{bucketname}",
2024-11-17 17:44:23 +01:00
headers=self.auth_headers,
2024-11-17 00:11:49 +01:00
)
try:
resp.raise_for_status()
2024-11-17 17:44:23 +01:00
except requests.HTTPError as exc:
if attempt < attempts - 1:
bucketname = f"{bucketbase}-{random.randint(1000,9999)}"
continue
else:
LOG.error("response was %s\n%s\n%s", resp, resp.headers, resp.text)
raise ValueError("could not find a good bucket for ") from exc
2024-11-17 00:11:49 +01:00
else:
break
2024-11-17 00:28:42 +01:00
LOG.info("Found good bucket: %s", bucketname)
2024-11-17 17:44:23 +01:00
self.bucket = bucketname
self.dl_url = f"https://archive.org/download/{self.bucket}/{filename.name}"
return self.dl_url
def copy(self, filename: Path, destination_path: bytes) -> str:
"""
destination_path is ignored
"""
upload_url = f"https://s3.us.archive.org/{self.bucket}/{filename.name}"
2024-11-17 00:11:49 +01:00
# XXX: set some more header based on file metadata (date, title, etc.)
headers = {
"x-archive-meta01-collection": "opensource",
"x-archive-meta-language": "ita",
}
with filename.open("rb") as buf:
resp = requests.put(
2024-11-17 17:44:23 +01:00
upload_url,
2024-11-17 00:11:49 +01:00
data=buf,
2024-11-17 17:44:23 +01:00
headers={**headers, **self.auth_headers},
2024-11-17 00:11:49 +01:00
)
resp.raise_for_status()
2024-11-17 17:44:23 +01:00
LOG.info("loaded on %s", self.dl_url)
return self.dl_url
2024-10-22 13:11:47 +02:00
2024-10-22 00:51:25 +02:00
class ArkiwiBackend(RcloneBackend):
def __init__(self, remote_name: str, prefix: str):
super().__init__(remote_name)
self.prefix: bytes = prefix.strip("/").encode("utf8")
def __str__(self):
2024-11-20 17:22:05 +01:00
return "<www.arkiwi.org>"
2024-11-05 11:11:11 +01:00
def ftp_path(self, path: bytes) -> bytes:
2024-11-17 00:12:38 +01:00
return b"ftp://upload.arkiwi.org/%s/" % (path.strip(b"/"),)
2024-11-05 11:11:11 +01:00
2024-10-22 00:51:25 +02:00
def actual_path(self, path: bytes) -> bytes:
return self.prefix + b"/" + path.lstrip(b"/")
def path_to_url(self, path: bytes) -> str:
# this is rfc4648 section 5
path = (
2024-10-22 13:51:00 +02:00
b64encode(self.actual_path(path), altchars=b"-_")
2024-10-22 00:51:25 +02:00
.rstrip(b"=")
.decode("ascii")
)
return f"https://www.arkiwi.org/path64/{path}/redirect"
2024-11-17 00:12:38 +01:00
# 2024-11-05: cambio da rclone a curl per girare intorno ai bug di webdav. Poi bisogna togliere l'intero metodo, così torniamo a quello di RcloneBackend
2024-10-22 13:11:47 +02:00
def exists(self, destination_path: bytes):
2024-11-05 11:11:11 +01:00
cmd = ["curl", "--netrc", "--head", "--silent", self.ftp_path(destination_path)]
try:
output = subprocess.check_output(cmd)
except subprocess.CalledProcessError as exc:
if exc.returncode == 3:
return False
raise
return bool(output)
2024-10-22 13:11:47 +02:00
2024-10-22 00:51:25 +02:00
def copy(self, filename: Path, destination_path: bytes) -> str:
"""
returns the URL
"""
2024-11-05 11:11:11 +01:00
# 2024-11-05: siccome webdav è rotto e invece FTP funziona, sostituisco la copia webdav (rclone) con una fatta con curl
# super().copy(filename, self.actual_path(destination_path))
2024-11-17 00:12:38 +01:00
cmd = [
"curl",
"--netrc",
"--upload-file",
str(filename),
self.ftp_path(destination_path),
]
2024-11-05 11:11:11 +01:00
subprocess.run(cmd, check=True)
2024-10-22 00:51:25 +02:00
url = self.path_to_url(destination_path)
response = requests.head(url, allow_redirects=True)
response.raise_for_status()
length = int(response.headers["Content-Length"])
expected = filename.stat().st_size
if length != expected:
raise ValueError(
"the uploaded file has a wrong size: %d instead of %d"
% (length, expected)
)
return url
2024-10-22 13:11:47 +02:00
BACKENDS = {
"arkiwi.org": ArkiwiBackend,
"archive.org": ArchiveBackend,
"default": ArkiwiBackend,
}
@contextlib.contextmanager
def zenity_pulsate(args):
proc = subprocess.Popen(
2024-11-20 17:22:05 +01:00
["zenity", "--auto-close", "--progress", "--pulsate", *args],
universal_newlines=True,
stdin=subprocess.PIPE,
)
try:
yield proc.stdin
finally:
proc.stdin.close()
2024-10-22 00:51:25 +02:00
class Carichello:
def __init__(self):
self.parser = self.get_parser()
def get_parser(self):
p = ArgumentParser()
p.add_argument(
"--config",
type=Path,
default=Path("~/.config/carichello/config.json").expanduser(),
)
2024-11-17 00:12:38 +01:00
p.add_argument("file", type=Path, nargs="?")
2024-10-22 00:51:25 +02:00
return p
def error(self) -> int:
LOG.error("generic error")
2024-10-22 13:51:00 +02:00
subprocess.run(
["zenity", "--error", "--title=Errore caricamento", "--text=Errore!"]
)
2024-10-22 00:51:25 +02:00
return 1
def error_exception(self, exc: Exception) -> int:
LOG.exception("error")
2024-10-22 13:51:00 +02:00
subprocess.run(
[
"zenity",
"--error",
"--title=Errore caricamento",
f"--text=Errore!\n\n<tt>{exc}</tt>",
]
)
2024-10-22 00:51:25 +02:00
return 1
def set_clipboard(self, text: str):
subprocess.run(["xsel", "-bi"], input=text.encode("utf8"))
def run(self) -> int:
LOG.info("start")
self.args = self.parser.parse_args()
with self.args.config.open() as buf:
self.config = json.load(buf)
backend_config = self.config["backends"][0]
backend_config.setdefault("type", "default")
BackendCls = BACKENDS[backend_config["type"]]
2024-10-28 17:03:13 +01:00
if self.args.file is None:
output = subprocess.check_output(
[
"zenity",
"--file-selection",
"--file-filter=Audio files | *.ogg | *.oga | *.opus",
"--title=Seleziona il file da caricare",
],
2024-11-18 18:30:26 +01:00
text=True,
2024-10-28 17:03:13 +01:00
)
if not output:
return 1
2024-11-20 17:22:05 +01:00
self.args.file = Path(output.rstrip("\n"))
2024-10-28 17:03:13 +01:00
if not self.args.file.exists():
2024-11-17 00:12:38 +01:00
subprocess.run(
2024-11-18 18:30:26 +01:00
["zenity", "--error", f"--text=Il file '{self.args.file}' non esiste"]
2024-11-17 00:12:38 +01:00
)
2024-10-28 17:03:13 +01:00
return 1
2024-10-22 00:51:25 +02:00
now = datetime.datetime.now()
dest_directory = f"/{now.year}/{now.month}"
dest_file = f"{dest_directory}/{self.args.file.name}".encode("utf8")
backend = BackendCls(**backend_config.get("config", {}))
2024-10-22 13:11:47 +02:00
if hasattr(backend, "path_to_url"):
url = backend.path_to_url(dest_file)
LOG.info("file %s would be uploaded to %s", str(self.args.file), url)
else:
url = None
LOG.info("file %s would be uploaded", str(self.args.file))
2024-11-20 17:22:05 +01:00
with zenity_pulsate(
[
f"--title=Caricamento file su {backend}",
f"--text=Verifiche file {self.args.file.name} in corso...",
]
) as zenity:
try:
exists = backend.exists(dest_file)
except subprocess.CalledProcessError as exc:
zenity.close()
return self.error_exception(exc)
2024-10-22 00:51:25 +02:00
if exists:
zenity.close()
2024-10-22 00:51:25 +02:00
subprocess.run(
[
"zenity",
"--info",
f"--title=Caricamento su {backend}",
2024-10-22 00:51:25 +02:00
f"--text=File {self.args.file.name} già presente:\n{url}",
]
)
return 1
zenity.write(f"# Creazione item per {self.args.file.name}\n")
zenity.flush()
reserved_url = backend.reserve(self.args.file)
if url is None:
url = reserved_url
else:
assert url == reserved_url
2024-11-17 17:44:23 +01:00
if url:
self.set_clipboard(url)
2024-11-20 17:22:05 +01:00
text = (
f"Caricamento su {url} in corso... Copia l'indirizzo da usare: 📋"
)
else:
text = f"Caricamento {self.args.file.name} in corso..."
zenity.write(f"# {text}\n")
zenity.flush()
try:
url = backend.copy(self.args.file, dest_file)
except Exception as exc:
zenity.close()
return self.error_exception(exc)
2024-10-22 00:51:25 +02:00
LOG.info("ready: %s", url)
self.set_clipboard(url)
2024-10-22 13:51:00 +02:00
subprocess.run(
[
"zenity",
"--info",
f"--text=Il file {self.args.file.name} è stato caricato:\n\n{url}",
]
)
2024-10-22 00:51:25 +02:00
return 0
if __name__ == "__main__":
sys.exit(Carichello().run())