2024-10-22 00:51:25 +02:00
#!/usr/bin/python3
import datetime
import sys
import json
from base64 import b64encode
from pathlib import Path
from argparse import ArgumentParser
from subprocess import Popen , PIPE
import subprocess
import tempfile
import logging
import requests
def setup_logging ( ) :
logdir = Path ( " ~/.local/share/carichello/ " ) . expanduser ( )
logdir . mkdir ( exist_ok = True , parents = True )
today = datetime . datetime . now ( ) . strftime ( " % Y- % m- %d _ % H: % M " )
logfile = tempfile . NamedTemporaryFile (
dir = logdir , prefix = f " { today } - " , suffix = " .txt " , delete = False
)
logging . basicConfig ( filename = logfile . name , filemode = " a " , level = logging . DEBUG )
logfile . close ( )
setup_logging ( )
LOG = logging . getLogger ( )
class RcloneBackend :
def __init__ ( self , remote_name : str ) :
"""
remote_name should be something like : myarchivedirectory : , which you should have configured elsewhere
"""
self . remote_name = remote_name
2024-10-22 13:11:47 +02:00
def actual_path ( self , destination_path : str ) :
return destination_path
2024-10-22 00:51:25 +02:00
def exists ( self , destination_path : bytes ) :
2024-10-22 13:11:47 +02:00
destination = b " %s : %s " % (
self . remote_name . encode ( " utf8 " ) ,
self . actual_path ( destination_path ) ,
)
2024-10-22 00:51:25 +02:00
cmd = [ " rclone " , " --quiet " , " lsjson " , destination ]
try :
output = subprocess . check_output ( cmd )
except subprocess . CalledProcessError as exc :
if exc . returncode == 3 :
return False
raise
data = json . loads ( output )
return bool ( data )
def copy ( self , filename : Path , destination_path : bytes ) :
"""
raises in case of errors
"""
destination = b " %s : %s " % ( self . remote_name . encode ( " utf8 " ) , destination_path )
cmd = [ " rclone " , " --quiet " , " copyto " , str ( filename ) , destination ]
subprocess . run ( cmd , check = True )
2024-10-22 13:11:47 +02:00
class ArchiveBackend ( RcloneBackend ) :
2024-11-17 00:11:49 +01:00
def __init__ ( self , accesskey : str , secret : str ) :
super ( ) . __init__ ( self )
self . accesskey = accesskey
self . secret = secret
def exists ( self , destination_path : bytes ) :
return False
def copy ( self , filename : Path , destination_path : bytes ) :
"""
destination_path is ignored
"""
bucketbase = filename . name . rsplit ( " . " , maxsplit = 1 ) [ 0 ] . replace ( " . " , " _ " )
if not bucketbase . startswith ( " ror- " ) :
bucketbase = " ror- " + bucketbase
bucketname = bucketbase
auth = { " authorization " : f " LOW { self . accesskey } : { self . secret } " }
print ( auth )
for attempt in range ( 5 ) :
print ( " trying " , bucketname )
resp = requests . put (
f " https://s3.us.archive.org/ { bucketname } " ,
headers = auth ,
)
try :
resp . raise_for_status ( )
except requests . HTTPError :
print ( resp . request . headers )
bucketname = f " { bucketbase } - { attempt + 1 } "
continue
else :
break
else :
raise ValueError ( " could not find a good bucket for " )
print ( " Found good bucket: %s " , bucketname )
url = f " https://s3.us.archive.org/ { bucketname } / { filename . name } "
# XXX: set some more header based on file metadata (date, title, etc.)
headers = {
" x-archive-meta01-collection " : " opensource " ,
" x-archive-meta-language " : " ita " ,
}
with filename . open ( " rb " ) as buf :
resp = requests . put (
url ,
data = buf ,
headers = { * * headers , * * auth } ,
)
resp . raise_for_status ( )
return f " https://archive.org/download/ { bucketname } / { filename . name } "
2024-10-22 13:11:47 +02:00
2024-10-22 00:51:25 +02:00
class ArkiwiBackend ( RcloneBackend ) :
def __init__ ( self , remote_name : str , prefix : str ) :
super ( ) . __init__ ( remote_name )
self . prefix : bytes = prefix . strip ( " / " ) . encode ( " utf8 " )
2024-11-05 11:11:11 +01:00
def ftp_path ( self , path : bytes ) - > bytes :
return b " ftp://upload.arkiwi.org/ %s / " % ( destination_path . strip ( b " / " ) , ) )
2024-10-22 00:51:25 +02:00
def actual_path ( self , path : bytes ) - > bytes :
return self . prefix + b " / " + path . lstrip ( b " / " )
def path_to_url ( self , path : bytes ) - > str :
# this is rfc4648 section 5
path = (
2024-10-22 13:51:00 +02:00
b64encode ( self . actual_path ( path ) , altchars = b " -_ " )
2024-10-22 00:51:25 +02:00
. rstrip ( b " = " )
. decode ( " ascii " )
)
return f " https://www.arkiwi.org/path64/ { path } /redirect "
2024-11-05 11:11:11 +01:00
#2024-11-05: cambio da rclone a curl per girare intorno ai bug di webdav. Poi bisogna togliere l'intero metodo, così torniamo a quello di RcloneBackend
2024-10-22 13:11:47 +02:00
def exists ( self , destination_path : bytes ) :
2024-11-05 11:11:11 +01:00
cmd = [ " curl " , " --netrc " , " --head " , " --silent " , self . ftp_path ( destination_path ) ]
try :
output = subprocess . check_output ( cmd )
except subprocess . CalledProcessError as exc :
if exc . returncode == 3 :
return False
raise
return bool ( output )
2024-10-22 13:11:47 +02:00
2024-10-22 00:51:25 +02:00
def copy ( self , filename : Path , destination_path : bytes ) - > str :
"""
returns the URL
"""
2024-11-05 11:11:11 +01:00
# 2024-11-05: siccome webdav è rotto e invece FTP funziona, sostituisco la copia webdav (rclone) con una fatta con curl
# super().copy(filename, self.actual_path(destination_path))
cmd = [ " curl " , " --netrc " , " --upload-file " , str ( filename ) , self . ftp_path ( destination_path ) ]
subprocess . run ( cmd , check = True )
2024-10-22 00:51:25 +02:00
url = self . path_to_url ( destination_path )
response = requests . head ( url , allow_redirects = True )
response . raise_for_status ( )
length = int ( response . headers [ " Content-Length " ] )
expected = filename . stat ( ) . st_size
if length != expected :
raise ValueError (
" the uploaded file has a wrong size: %d instead of %d "
% ( length , expected )
)
return url
2024-10-22 13:11:47 +02:00
BACKENDS = {
" arkiwi.org " : ArkiwiBackend ,
" archive.org " : ArchiveBackend ,
" default " : ArkiwiBackend ,
}
2024-10-22 00:51:25 +02:00
class Carichello :
def __init__ ( self ) :
self . parser = self . get_parser ( )
def get_parser ( self ) :
p = ArgumentParser ( )
p . add_argument (
" --config " ,
type = Path ,
default = Path ( " ~/.config/carichello/config.json " ) . expanduser ( ) ,
)
2024-10-28 17:03:13 +01:00
p . add_argument ( " file " , type = Path , nargs = ' ? ' )
2024-10-22 00:51:25 +02:00
return p
def error ( self ) - > int :
LOG . error ( " generic error " )
2024-10-22 13:51:00 +02:00
subprocess . run (
[ " zenity " , " --error " , " --title=Errore caricamento " , " --text=Errore! " ]
)
2024-10-22 00:51:25 +02:00
return 1
def error_exception ( self , exc : Exception ) - > int :
LOG . exception ( " error " )
2024-10-22 13:51:00 +02:00
subprocess . run (
[
" zenity " ,
" --error " ,
" --title=Errore caricamento " ,
f " --text=Errore! \n \n <tt> { exc } </tt> " ,
]
)
2024-10-22 00:51:25 +02:00
return 1
def set_clipboard ( self , text : str ) :
subprocess . run ( [ " xsel " , " -bi " ] , input = text . encode ( " utf8 " ) )
def run ( self ) - > int :
LOG . info ( " start " )
self . args = self . parser . parse_args ( )
with self . args . config . open ( ) as buf :
self . config = json . load ( buf )
2024-10-22 13:11:47 +02:00
self . config . setdefault ( " type " , " default " )
BackendCls = BACKENDS [ self . config [ " type " ] ]
2024-10-28 17:03:13 +01:00
if self . args . file is None :
output = subprocess . check_output (
[
" zenity " ,
" --file-selection " ,
" --file-filter=Audio files | *.ogg | *.oga | *.opus " ,
" --title=Seleziona il file da caricare " ,
] ,
)
if not output :
return 1
self . args . file = Path ( output )
if not self . args . file . exists ( ) :
subprocess . run ( [
" zenity " , " --error " , " --text=Il file selezionato non esiste "
] )
return 1
2024-10-22 00:51:25 +02:00
now = datetime . datetime . now ( )
dest_directory = f " / { now . year } / { now . month } "
dest_file = f " { dest_directory } / { self . args . file . name } " . encode ( " utf8 " )
2024-11-17 00:11:49 +01:00
backend = BackendCls ( * * self . config . get ( " config " , { } ) )
2024-10-22 13:11:47 +02:00
if hasattr ( backend , " path_to_url " ) :
url = backend . path_to_url ( dest_file )
LOG . info ( " file %s would be uploaded to %s " , str ( self . args . file ) , url )
else :
url = None
LOG . info ( " file %s would be uploaded " , str ( self . args . file ) )
2024-10-22 00:51:25 +02:00
zenity = Popen (
[
" zenity " ,
" --auto-close " ,
" --progress " ,
" --pulsate " ,
" --title=Caricamento su Arkiwi " ,
f " --text=Verifiche file { self . args . file . name } in corso... " ,
] ,
stdin = PIPE ,
)
try :
exists = backend . exists ( dest_file )
zenity . stdin . close ( )
if exists :
subprocess . run (
[
" zenity " ,
" --info " ,
" --title=Caricamento su Arkiwi " ,
f " --text=File { self . args . file . name } già presente: \n { url } " ,
]
)
return 1
except subprocess . CalledProcessError as exc :
zenity . stdin . close ( )
return self . error_exception ( exc )
2024-11-17 00:12:19 +01:00
if url :
self . set_clipboard ( url )
text = f " Caricamento file { self . args . file . name } su { self . config [ ' type ' ] } in corso... \n Copia l ' indirizzo da usare <a href= ' # ' >📋 </a> "
else :
text = f " Caricamento file { self . args . file . name } su { self . config [ ' type ' ] } in corso... "
2024-10-22 00:51:25 +02:00
zenity = Popen (
[
" zenity " ,
" --auto-close " ,
" --progress " ,
" --pulsate " ,
2024-11-17 00:12:19 +01:00
f " --text= { text } " ,
2024-10-22 00:51:25 +02:00
] ,
stdin = PIPE ,
)
try :
url = backend . copy ( self . args . file , dest_file )
except Exception as exc :
return self . error_exception ( exc )
finally :
zenity . stdin . close ( )
LOG . info ( " ready: %s " , url )
2024-10-22 13:51:00 +02:00
subprocess . run (
[
" zenity " ,
" --info " ,
f " --text=Il file { self . args . file . name } è stato caricato: \n \n { url } " ,
]
)
2024-10-22 00:51:25 +02:00
return 0
if __name__ == " __main__ " :
sys . exit ( Carichello ( ) . run ( ) )