237 lines
7.3 KiB
Python
237 lines
7.3 KiB
Python
#!/usr/bin/python3
|
|
|
|
import datetime
|
|
import sys
|
|
import json
|
|
from base64 import b64encode
|
|
from pathlib import Path
|
|
from argparse import ArgumentParser
|
|
from subprocess import Popen, PIPE
|
|
import subprocess
|
|
import tempfile
|
|
import logging
|
|
|
|
import requests
|
|
|
|
|
|
def setup_logging():
|
|
logdir = Path("~/.local/share/carichello/").expanduser()
|
|
logdir.mkdir(exist_ok=True, parents=True)
|
|
today = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M")
|
|
logfile = tempfile.NamedTemporaryFile(
|
|
dir=logdir, prefix=f"{today}-", suffix=".txt", delete=False
|
|
)
|
|
logging.basicConfig(filename=logfile.name, filemode="a", level=logging.DEBUG)
|
|
logfile.close()
|
|
|
|
|
|
setup_logging()
|
|
LOG = logging.getLogger()
|
|
|
|
|
|
class RcloneBackend:
|
|
def __init__(self, remote_name: str):
|
|
"""
|
|
remote_name should be something like :myarchivedirectory:, which you should have configured elsewhere
|
|
"""
|
|
self.remote_name = remote_name
|
|
|
|
def actual_path(self, destination_path: str):
|
|
return destination_path
|
|
|
|
def exists(self, destination_path: bytes):
|
|
destination = b"%s:%s" % (
|
|
self.remote_name.encode("utf8"),
|
|
self.actual_path(destination_path),
|
|
)
|
|
cmd = ["rclone", "--quiet", "lsjson", destination]
|
|
try:
|
|
output = subprocess.check_output(cmd)
|
|
except subprocess.CalledProcessError as exc:
|
|
if exc.returncode == 3:
|
|
return False
|
|
raise
|
|
|
|
data = json.loads(output)
|
|
return bool(data)
|
|
|
|
def copy(self, filename: Path, destination_path: bytes):
|
|
"""
|
|
raises in case of errors
|
|
"""
|
|
destination = b"%s:%s" % (self.remote_name.encode("utf8"), destination_path)
|
|
cmd = ["rclone", "--quiet", "copyto", str(filename), destination]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
class ArchiveBackend(RcloneBackend):
|
|
# XXX: there should be a way to "reserve" a bucket
|
|
# with that, we could now the URL before copying
|
|
# XXX: use x-archive-meta$field header to set metadata based on file contents, see
|
|
# https://github.com/vmbrasseur/IAS3API/blob/master/metadata.md
|
|
# they can be set using --header-upload, see https://rclone.org/docs/#header-upload
|
|
pass
|
|
|
|
|
|
class ArkiwiBackend(RcloneBackend):
|
|
def __init__(self, remote_name: str, prefix: str):
|
|
super().__init__(remote_name)
|
|
self.prefix: bytes = prefix.strip("/").encode("utf8")
|
|
|
|
def actual_path(self, path: bytes) -> bytes:
|
|
return self.prefix + b"/" + path.lstrip(b"/")
|
|
|
|
def path_to_url(self, path: bytes) -> str:
|
|
# this is rfc4648 section 5
|
|
path = (
|
|
b64encode(self.actual_path(path), altchars=b"-_")
|
|
.rstrip(b"=")
|
|
.decode("ascii")
|
|
)
|
|
return f"https://www.arkiwi.org/path64/{path}/redirect"
|
|
|
|
def exists(self, destination_path: bytes):
|
|
return False
|
|
|
|
def copy(self, filename: Path, destination_path: bytes) -> str:
|
|
"""
|
|
returns the URL
|
|
"""
|
|
super().copy(filename, self.actual_path(destination_path))
|
|
url = self.path_to_url(destination_path)
|
|
response = requests.head(url, allow_redirects=True)
|
|
response.raise_for_status()
|
|
length = int(response.headers["Content-Length"])
|
|
expected = filename.stat().st_size
|
|
if length != expected:
|
|
raise ValueError(
|
|
"the uploaded file has a wrong size: %d instead of %d"
|
|
% (length, expected)
|
|
)
|
|
return url
|
|
|
|
|
|
BACKENDS = {
|
|
"arkiwi.org": ArkiwiBackend,
|
|
"archive.org": ArchiveBackend,
|
|
"default": ArkiwiBackend,
|
|
}
|
|
|
|
|
|
class Carichello:
|
|
def __init__(self):
|
|
self.parser = self.get_parser()
|
|
|
|
def get_parser(self):
|
|
p = ArgumentParser()
|
|
p.add_argument(
|
|
"--config",
|
|
type=Path,
|
|
default=Path("~/.config/carichello/config.json").expanduser(),
|
|
)
|
|
p.add_argument("file", type=Path)
|
|
return p
|
|
|
|
def error(self) -> int:
|
|
LOG.error("generic error")
|
|
subprocess.run(
|
|
["zenity", "--error", "--title=Errore caricamento", "--text=Errore!"]
|
|
)
|
|
return 1
|
|
|
|
def error_exception(self, exc: Exception) -> int:
|
|
LOG.exception("error")
|
|
subprocess.run(
|
|
[
|
|
"zenity",
|
|
"--error",
|
|
"--title=Errore caricamento",
|
|
f"--text=Errore!\n\n<tt>{exc}</tt>",
|
|
]
|
|
)
|
|
return 1
|
|
|
|
def set_clipboard(self, text: str):
|
|
subprocess.run(["xsel", "-bi"], input=text.encode("utf8"))
|
|
|
|
def run(self) -> int:
|
|
LOG.info("start")
|
|
self.args = self.parser.parse_args()
|
|
with self.args.config.open() as buf:
|
|
self.config = json.load(buf)
|
|
self.config.setdefault("type", "default")
|
|
BackendCls = BACKENDS[self.config["type"]]
|
|
now = datetime.datetime.now()
|
|
dest_directory = f"/{now.year}/{now.month}"
|
|
dest_file = f"{dest_directory}/{self.args.file.name}".encode("utf8")
|
|
if (
|
|
BackendCls == ArkiwiBackend
|
|
): # XXX: reimplementa in modo che questo if non sia necessario
|
|
backend = BackendCls(self.config["remote"], self.config["prefix"])
|
|
else:
|
|
backend = BackendCls(self.config["remote"])
|
|
|
|
if hasattr(backend, "path_to_url"):
|
|
url = backend.path_to_url(dest_file)
|
|
LOG.info("file %s would be uploaded to %s", str(self.args.file), url)
|
|
else:
|
|
url = None
|
|
LOG.info("file %s would be uploaded", str(self.args.file))
|
|
zenity = Popen(
|
|
[
|
|
"zenity",
|
|
"--auto-close",
|
|
"--progress",
|
|
"--pulsate",
|
|
"--title=Caricamento su Arkiwi",
|
|
f"--text=Verifiche file {self.args.file.name} in corso...",
|
|
],
|
|
stdin=PIPE,
|
|
)
|
|
try:
|
|
exists = backend.exists(dest_file)
|
|
zenity.stdin.close()
|
|
if exists:
|
|
subprocess.run(
|
|
[
|
|
"zenity",
|
|
"--info",
|
|
"--title=Caricamento su Arkiwi",
|
|
f"--text=File {self.args.file.name} già presente:\n{url}",
|
|
]
|
|
)
|
|
return 1
|
|
except subprocess.CalledProcessError as exc:
|
|
zenity.stdin.close()
|
|
return self.error_exception(exc)
|
|
|
|
self.set_clipboard(url)
|
|
zenity = Popen(
|
|
[
|
|
"zenity",
|
|
"--auto-close",
|
|
"--progress",
|
|
"--pulsate",
|
|
f"--text=Caricamento file {self.args.file.name} su arkiwi in corso...\n Copia l'indirizzo da usare <a href='#'>📋 </a>",
|
|
],
|
|
stdin=PIPE,
|
|
)
|
|
try:
|
|
url = backend.copy(self.args.file, dest_file)
|
|
except Exception as exc:
|
|
return self.error_exception(exc)
|
|
finally:
|
|
zenity.stdin.close()
|
|
LOG.info("ready: %s", url)
|
|
subprocess.run(
|
|
[
|
|
"zenity",
|
|
"--info",
|
|
f"--text=Il file {self.args.file.name} è stato caricato:\n\n{url}",
|
|
]
|
|
)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(Carichello().run())
|