188 lines
6 KiB
Python
188 lines
6 KiB
Python
#!/usr/bin/python3
|
|
|
|
import datetime
|
|
import sys
|
|
import json
|
|
from base64 import b64encode
|
|
from pathlib import Path
|
|
from argparse import ArgumentParser
|
|
from subprocess import Popen, PIPE
|
|
import subprocess
|
|
import tempfile
|
|
import logging
|
|
|
|
import requests
|
|
|
|
|
|
def setup_logging():
|
|
logdir = Path("~/.local/share/carichello/").expanduser()
|
|
logdir.mkdir(exist_ok=True, parents=True)
|
|
today = datetime.datetime.now().strftime("%Y-%m-%d_%H:%M")
|
|
logfile = tempfile.NamedTemporaryFile(
|
|
dir=logdir, prefix=f"{today}-", suffix=".txt", delete=False
|
|
)
|
|
logging.basicConfig(filename=logfile.name, filemode="a", level=logging.DEBUG)
|
|
logfile.close()
|
|
|
|
|
|
setup_logging()
|
|
LOG = logging.getLogger()
|
|
|
|
|
|
class RcloneBackend:
|
|
def __init__(self, remote_name: str):
|
|
"""
|
|
remote_name should be something like :myarchivedirectory:, which you should have configured elsewhere
|
|
"""
|
|
self.remote_name = remote_name
|
|
|
|
def exists(self, destination_path: bytes):
|
|
destination = b"%s:%s" % (self.remote_name.encode("utf8"), self.actual_path(destination_path))
|
|
cmd = ["rclone", "--quiet", "lsjson", destination]
|
|
try:
|
|
output = subprocess.check_output(cmd)
|
|
except subprocess.CalledProcessError as exc:
|
|
if exc.returncode == 3:
|
|
return False
|
|
raise
|
|
|
|
data = json.loads(output)
|
|
return bool(data)
|
|
|
|
def copy(self, filename: Path, destination_path: bytes):
|
|
"""
|
|
raises in case of errors
|
|
"""
|
|
destination = b"%s:%s" % (self.remote_name.encode("utf8"), destination_path)
|
|
cmd = ["rclone", "--quiet", "copyto", str(filename), destination]
|
|
subprocess.run(cmd, check=True)
|
|
|
|
|
|
class ArkiwiBackend(RcloneBackend):
|
|
def __init__(self, remote_name: str, prefix: str):
|
|
super().__init__(remote_name)
|
|
self.prefix: bytes = prefix.strip("/").encode("utf8")
|
|
|
|
def actual_path(self, path: bytes) -> bytes:
|
|
return self.prefix + b"/" + path.lstrip(b"/")
|
|
|
|
def path_to_url(self, path: bytes) -> str:
|
|
# this is rfc4648 section 5
|
|
path = (
|
|
b64encode(
|
|
self.actual_path(path), altchars=b"-_"
|
|
)
|
|
.rstrip(b"=")
|
|
.decode("ascii")
|
|
)
|
|
return f"https://www.arkiwi.org/path64/{path}/redirect"
|
|
|
|
def copy(self, filename: Path, destination_path: bytes) -> str:
|
|
"""
|
|
returns the URL
|
|
"""
|
|
super().copy(filename, self.actual_path(destination_path))
|
|
url = self.path_to_url(destination_path)
|
|
response = requests.head(url, allow_redirects=True)
|
|
response.raise_for_status()
|
|
length = int(response.headers["Content-Length"])
|
|
expected = filename.stat().st_size
|
|
if length != expected:
|
|
raise ValueError(
|
|
"the uploaded file has a wrong size: %d instead of %d"
|
|
% (length, expected)
|
|
)
|
|
return url
|
|
|
|
|
|
class Carichello:
|
|
def __init__(self):
|
|
self.parser = self.get_parser()
|
|
|
|
def get_parser(self):
|
|
p = ArgumentParser()
|
|
p.add_argument(
|
|
"--config",
|
|
type=Path,
|
|
default=Path("~/.config/carichello/config.json").expanduser(),
|
|
)
|
|
p.add_argument("file", type=Path)
|
|
return p
|
|
|
|
def error(self) -> int:
|
|
LOG.error("generic error")
|
|
subprocess.run(["zenity", "--error", "--title=Errore caricamento", "--text=Errore!"])
|
|
return 1
|
|
|
|
def error_exception(self, exc: Exception) -> int:
|
|
LOG.exception("error")
|
|
subprocess.run(["zenity", "--error", "--title=Errore caricamento", f"--text=Errore!\n\n<tt>{exc}</tt>"])
|
|
return 1
|
|
|
|
def set_clipboard(self, text: str):
|
|
subprocess.run(["xsel", "-bi"], input=text.encode("utf8"))
|
|
|
|
def run(self) -> int:
|
|
LOG.info("start")
|
|
self.args = self.parser.parse_args()
|
|
with self.args.config.open() as buf:
|
|
self.config = json.load(buf)
|
|
now = datetime.datetime.now()
|
|
dest_directory = f"/{now.year}/{now.month}"
|
|
dest_file = f"{dest_directory}/{self.args.file.name}".encode("utf8")
|
|
backend = ArkiwiBackend(self.config["remote"], self.config["prefix"])
|
|
url = backend.path_to_url(dest_file)
|
|
LOG.info("file %s would be uploaded to %s", str(self.args.file), url)
|
|
zenity = Popen(
|
|
[
|
|
"zenity",
|
|
"--auto-close",
|
|
"--progress",
|
|
"--pulsate",
|
|
"--title=Caricamento su Arkiwi",
|
|
f"--text=Verifiche file {self.args.file.name} in corso...",
|
|
],
|
|
stdin=PIPE,
|
|
)
|
|
try:
|
|
exists = backend.exists(dest_file)
|
|
zenity.stdin.close()
|
|
if exists:
|
|
subprocess.run(
|
|
[
|
|
"zenity",
|
|
"--info",
|
|
"--title=Caricamento su Arkiwi",
|
|
f"--text=File {self.args.file.name} già presente:\n{url}",
|
|
]
|
|
)
|
|
return 1
|
|
except subprocess.CalledProcessError as exc:
|
|
zenity.stdin.close()
|
|
return self.error_exception(exc)
|
|
|
|
self.set_clipboard(url)
|
|
zenity = Popen(
|
|
[
|
|
"zenity",
|
|
"--auto-close",
|
|
"--progress",
|
|
"--pulsate",
|
|
f"--text=Caricamento file {self.args.file.name} su arkiwi in corso...\n Copia l'indirizzo da usare <a href='#'>📋 </a>",
|
|
],
|
|
stdin=PIPE,
|
|
)
|
|
try:
|
|
url = backend.copy(self.args.file, dest_file)
|
|
except Exception as exc:
|
|
return self.error_exception(exc)
|
|
finally:
|
|
zenity.stdin.close()
|
|
LOG.info("ready: %s", url)
|
|
subprocess.run(["zenity", "--info",
|
|
f"--text=Il file {self.args.file.name} è stato caricato:\n\n{url}"])
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
sys.exit(Carichello().run())
|