#!/usr/bin/env python3 #Playlistalo - simpatico script che legge le cartelle e genera la playlist import youtube_dl import shutil import sys import re import os import validators from glob import glob import json import time import subprocess import random import configparser from telegram.ext import Updater, MessageHandler, Filters from mastodon import Mastodon, StreamListener scriptpath = os.path.dirname(os.path.realpath(__file__)) #Crea le cartelle if not os.path.exists("playlist"): os.makedirs("playlist") if not os.path.exists("cache"): os.makedirs("cache") if not os.path.exists("fallback"): os.makedirs("fallback") if not os.path.exists("archive"): os.makedirs("archive") #Scrivi la prima configurazione configfile = 'playlistalo.conf' if not os.path.exists(configfile): config = configparser.ConfigParser() config['playlistalo'] = {'ShuffleUsers': False, 'ShuffleSongs': False, 'ShuffleFallback': False, 'Telegram_token': "", 'Mastodon_token': "", 'Mastodon_url': ""} with open(configfile, 'w') as f: config.write(f) #Leggi la configurazione config = configparser.ConfigParser() config.read(configfile) playlistaloconf = config['playlistalo'] SHUFFLEUSERS = playlistaloconf.getboolean('ShuffleUsers') SHUFFLESONGS = playlistaloconf.getboolean('ShuffleSongs') SHUFFLEFALLBACK = playlistaloconf.getboolean('ShuffleFallback') TELEGRAM_TOKEN = playlistaloconf.get('Telegram_token') MASTODON_TOKEN = playlistaloconf.get('Mastodon_token') MASTODON_URL = playlistaloconf.get('Mastodon_url') def add(url, user = "-unknown-"): #print ('--- Inizio ---') ydl_opts = { 'format': 'bestaudio[ext=m4a]', 'outtmpl': 'cache/%(id)s.m4a', 'noplaylist': True, 'quiet': True, } url = url.strip() print ("url: " + url) print ("user: " + user) if not validators.url(url): print ('--- URL malformato ---') return ("Err: url non valido") with youtube_dl.YoutubeDL(ydl_opts) as ydl: try: meta = ydl.extract_info(url, download = False) except youtube_dl.DownloadError as detail: print ('--- Errore video non disponibile ---') print(str(detail)) return ("Err: " + str(detail)) id = meta.get('id').strip() title = __normalizetext(meta.get('title')) print ('id: %s' %(id)) print ('title: %s' %(title)) #scrivo il json with open(os.path.join("cache", id + ".json"), 'w') as outfile: json.dump(meta, outfile, indent=4) #ho letto le info, ora controllo se il file esiste altrimenti lo scarico #miglioria: controllare se upload_date e' uguale a quella del json gia' esistente filetemp = os.path.join("cache", id + ".m4a") if not glob(filetemp): print ('--- Scarico ---') ydl.download([url]) #non ho capito perche' ma senza [] fa un carattere per volta if not os.path.isfile(filetemp): return("Err: file non scaricato") #se il file esiste gia' in playlist salto (potrebbe esserci, anche rinominato) if glob(scriptpath + "/playlist/**/*|" + id + ".*"): print ('--- File già presente ---') return ("Err: %s [%s] già presente" %(title, id)) if not os.path.exists("playlist/" + user): os.makedirs("playlist/" + user) #qui compone il nome del file if SHUFFLESONGS: fileout = str(random.randrange(10**6)).zfill(14) + "|" + title + "|" + id + ".m4a" else: fileout = time.strftime("%Y%m%d%H%M%S") + "|" + title + "|" + id + ".m4a" fileout = os.path.join("playlist/" + user, fileout) print ('--- Converto ---') print (fileout) subprocess.call([scriptpath + "/trimaudio.sh", filetemp, fileout]) if not os.path.isfile(fileout): return("Err: file non convertito") #cerca la posizione del pezzo appena inserito pos = getposition(fileout) print ('--- Fine ---') print ("") return ("OK: %s [%s] aggiunto alla playlist in posizione #%s" %(title, id, pos)) def __normalizetext(s): if s is None: return None else: s = re.sub(r'[\\|/|:|*|?|"|<|>|\|]',r'',s) s = " ".join(s.split()) return s def list(): pl = [] pl2 = [] for udir in sorted(glob(scriptpath + "/playlist/*/")): #print (udir) user = os.path.basename(os.path.dirname(udir)) #cerca il file last last = "" if os.path.exists(udir + "/last"): f = open(udir + "/last", "r") last = f.readline().rstrip() else: last = os.path.basename(sorted(glob(udir + "/*.m4a"))[0]).split("|")[0] #print ("LAST: " + last) #leggi i file nella cartella files = sorted(glob(udir + "/*.m4a")) seq = 0 for file in files: bn = os.path.splitext(os.path.basename(file))[0] #print ("BASENAME: " + bn) seq = seq + 1 dat = bn.split("|")[0] nam = bn.split("|")[1] cod = bn.split("|")[2] key = "-".join([str(seq).zfill(5), last, dat]) #print ("KEY: " + key) plsong = [key, file.replace(scriptpath + "/", "") , user, nam, cod] #, file pl.append(plsong) pl.sort() #rimuove la prima colonna, che serve solo per l'ordinamento pl2 = [x[1:] for x in pl] #print (pl) #print ('\n'.join([", ".join(x) for x in pl])) #print ('\n'.join([x[0] for x in pl])) return pl2 def listfallback(): pl = [] pl2 = [] #leggi i file nella cartella files = sorted(glob(scriptpath + "/fallback/*.m4a")) seq = 0 for file in files: bn = os.path.splitext(os.path.basename(file))[0] seq = seq + 1 dat = bn.split("|")[0] nam = bn.split("|")[1] cod = bn.split("|")[2] key = "-".join([str(seq).zfill(5), dat]) plsong = [key, file.replace(scriptpath + "/", "") , "fallback", nam, cod] #, file pl.append(plsong) pl.sort() #rimuove la prima colonna, che serve solo per l'ordinamento pl2 = [x[1:] for x in pl] return pl2 def playsingle(): pl = list() #print ('\n'.join([x[0] for x in pl])) if pl: firstsong = scriptpath + "/" + pl[0][0] print (firstsong) #qui fa play subprocess.call(["mplayer", "-nolirc", "-msglevel", "all=0:statusline=5", firstsong]) #alla fine consuma os.rename(firstsong, scriptpath + "/archive/" + os.path.basename(firstsong)) #se non ci sono + file cancella la cartella if not glob(os.path.dirname(firstsong) + "/*.m4a"): shutil.rmtree(os.path.dirname(firstsong)) else: with open(os.path.dirname(firstsong) + "/last", "w") as f: f.write(time.strftime("%Y%m%d%H%M%S")) else: #usa il fallback #eventualmente aggiungere file di avviso con istruzioni veloci plf = listfallback() #print ('\n'.join([x[0] for x in plf])) if plf: firstsong = plf[0][0] print (firstsong) #qui fa play subprocess.call(["mplayer", "-nolirc", "-msglevel", "all=0:statusline=5", firstsong]) #alla fine consuma fname = time.strftime("%Y%m%d%H%M%S") + "|" + "|".join(os.path.basename(firstsong).split("|")[1:]) fname = os.path.dirname(firstsong) + "/" + fname os.rename(firstsong, fname) def playloop(): while True: playsingle() def clean(): #cancella tutto dalla playlist shutil.rmtree("playlist") os.makedirs("playlist") def shuffleusers(): #scrivere un numero casuale dentro a tutti i file last for udir in sorted(glob("playlist/*/")): #print (udir) with open(udir + "/last", "w") as f: f.write(str(random.randrange(10**6)).zfill(14)) def shufflefallback(): #rinominare con un numero casuale i file in fallback files = sorted(glob("fallback/*.m4a")) for file in files: fname = str(random.randrange(10**6)).zfill(14) + "|" + "|".join(os.path.basename(file).split("|")[1:]) fname = os.path.dirname(file) + "/" + fname os.rename(file, fname) def getposition(file): pl = list() try: return([x[0] for x in pl].index(file) + 1) except: pass def telegram_msg_parser(bot, update): print("Messaggio ricevuto") msg = update.message.text id = str(update.message.from_user.id) username = update.message.from_user.username urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', msg) user = "t_" + "-".join([i for i in [id, username] if i]) #print (urls) #print (user) if not urls: update.message.reply_text("Non ho trovato indirizzi validi...") return() update.message.reply_text("Messaggio ricevuto. Elaboro...") for url in urls: #update.message.reply_text("Scarico %s" %(url)) # start the download dl = add(url, user) update.message.reply_text(dl) def telegram_bot(): print ("Bot avviato") # Create the EventHandler and pass it your bot's token. updater = Updater(TELEGRAM_TOKEN) # Get the dispatcher to register handlers dp = updater.dispatcher # parse message dp.add_handler(MessageHandler(Filters.text, telegram_msg_parser)) # Start the Bot updater.start_polling() # Run the bot until you press Ctrl-C updater.idle() class MastodonListener(StreamListener): # andiamo a definire il metodo __init__, che prende una istanza di Mastodon come parametro opzionale e lo setta nella prop. self.mastodon def __init__(self, mastodonInstance=None): self.mastodon = mastodonInstance def on_notification(self, notification): print("Messaggio ricevuto") #try: msg = notification["status"]["content"] id = str(notification["account"]["id"]) username = notification["account"]["acct"] #except KeyError: # return msg = msg.replace("
", "\n") msg = re.compile(r'<.*?>').sub('', msg) urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', msg) user = "m_" + "-".join([i for i in [id, username] if i]) #print (urls) #print (user) #visibility = notification['status']['visibility'] statusid = notification['status']['id'] if not urls: self.mastodon.status_post("@" + username + " " + "Non ho trovato indirizzi validi...", in_reply_to_id = statusid, visibility="direct") return() self.mastodon.status_post("@" + username + " " + "Messaggio ricevuto. Elaboro...", in_reply_to_id = statusid, visibility="direct") for url in urls: # start the download dl = add(url, user) self.mastodon.status_post("@" + username + " " + dl, in_reply_to_id = statusid, visibility="direct") def mastodon_bot(): print ("Bot avviato") mastodon = Mastodon(access_token = MASTODON_TOKEN, api_base_url = MASTODON_URL) listener = MastodonListener(mastodon) mastodon.stream_user(listener) if __name__ == '__main__': print ("This is a package, use other commands to run it")