#!/usr/bin/env python3 #Playlistalo - simpatico script che legge le cartelle e genera la playlist #Requirements: youtube_dl Mastodon.py python-telegram-bot validators python-musicpd #import youtube_dl import yt_dlp as youtube_dl import shutil import sys import re import os import validators from glob import glob import json import time import subprocess import random import configparser import hashlib from urllib.parse import urlparse, parse_qs import argparse from musicpd import MPDClient from telegram import Update from telegram.ext import Application, ContextTypes, MessageHandler, filters from mastodon import Mastodon, StreamListener scriptpath = os.path.dirname(os.path.realpath(__file__)) #Crea le cartelle os.makedirs("playlist", exist_ok=True) os.makedirs("fallback", exist_ok=True) SHUFFLEUSERS = False SHUFFLESONGS = False SHUFFLEFALLBACK = False ARCHIVE = True TELEGRAM_TOKEN = "" MASTODON_TOKEN = "" MASTODON_URL = "" ANNOUNCEREPEAT = 5 #Scrivi la prima configurazione configfile = 'playlistalo.conf' if not os.path.exists(configfile): config = configparser.ConfigParser() config['playlistalo'] = {'ShuffleUsers': SHUFFLEUSERS, 'ShuffleSongs': SHUFFLESONGS, 'ShuffleFallback': SHUFFLEFALLBACK, 'Archive': ARCHIVE, 'Telegram_token': TELEGRAM_TOKEN, 'Mastodon_token': MASTODON_TOKEN, 'Mastodon_url': MASTODON_URL} with open(configfile, 'w') as f: config.write(f) #Leggi la configurazione config = configparser.ConfigParser() config.read(configfile) playlistaloconf = config['playlistalo'] SHUFFLEUSERS = playlistaloconf.getboolean('ShuffleUsers', SHUFFLEUSERS) SHUFFLESONGS = playlistaloconf.getboolean('ShuffleSongs', SHUFFLESONGS) SHUFFLEFALLBACK = playlistaloconf.getboolean('ShuffleFallback', SHUFFLEFALLBACK) ARCHIVE = playlistaloconf.getboolean('Archive', ARCHIVE) TELEGRAM_TOKEN = playlistaloconf.get('Telegram_token', TELEGRAM_TOKEN) MASTODON_TOKEN = playlistaloconf.get('Mastodon_token',MASTODON_TOKEN) MASTODON_URL = playlistaloconf.get('Mastodon_url', MASTODON_URL) def video_id(value): """ Examples: - http://youtu.be/SA2iWivDJiE - http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu - http://www.youtube.com/embed/SA2iWivDJiE - http://www.youtube.com/v/SA2iWivDJiE?version=3&hl=en_US """ query = urlparse(value) if query.hostname == 'youtu.be': return query.path[1:] if query.hostname in ('www.youtube.com', 'm.youtube.com', 'youtube.com'): if query.path == '/watch': p = parse_qs(query.query) return p['v'][0] if query.path[:7] == '/embed/': return query.path.split('/')[2] if query.path[:3] == '/v/': return query.path.split('/')[2] # fail? return None def addurl(url, user = "-unknown-"): #print ('--- Inizio ---') os.makedirs("cache", exist_ok=True) ydl_opts = { 'format': 'bestaudio[ext=m4a]', 'outtmpl': 'cache/%(id)s.m4a', 'no-cache-dir': True, 'noplaylist': True, 'quiet': True, } url = url.strip() print ("url: " + url) print ("user: " + user) if not validators.url(url): print ('--- URL malformato ---') return ("Err: url non valido") #trova l'id dall'url id = video_id(url) if id: #cerca se ho gia' il json e il file if not (glob(os.path.join("cache", id + ".json")) and glob(os.path.join("cache", id + ".m4a"))): id = None else: #legge le info with open(os.path.join("cache", id + ".json")) as infile: j = json.load(infile) title = normalizetext(j.get('title')) track = normalizetext(j.get('track')) artist = normalizetext(j.get('artist')) album = normalizetext(j.get('album')) filetemp = os.path.join("cache", id + ".m4a") print ('id: %s' %(id)) print ('title: %s' %(title)) if not id: with youtube_dl.YoutubeDL(ydl_opts) as ydl: try: meta = ydl.extract_info(url, download = False) except youtube_dl.DownloadError as detail: print ('--- Errore video non disponibile ---') print(str(detail)) return ("Err: " + str(detail)) id = meta.get('id').strip() title = normalizetext(meta.get('title')) track = normalizetext(meta.get('track')) artist = normalizetext(meta.get('artist')) album = normalizetext(meta.get('album')) print ('id: %s' %(id)) print ('title: %s' %(title)) #scrivo il json with open(os.path.join("cache", id + ".json"), 'w') as outfile: json.dump(meta, outfile, indent=4) #ho letto le info, ora controllo se il file esiste altrimenti lo scarico #miglioria: controllare se upload_date e' uguale a quella del json gia' esistente filetemp = os.path.join("cache", id + ".m4a") if not glob(filetemp): print ('--- Scarico ---') ydl.download([url]) #non ho capito perche' ma senza [] fa un carattere per volta if not os.path.isfile(filetemp): return("Err: file non scaricato") return(add(filetemp, user, title, id, track, artist, album)) def md5(fname): hash_md5 = hashlib.md5() with open(fname, "rb") as f: for chunk in iter(lambda: f.read(4096), b""): hash_md5.update(chunk) return hash_md5.hexdigest() def add(filetemp, user = "-unknown-", title = None, id = None, track = None, artist = None, album = None): if not id: id = md5(filetemp) if not title: title = os.path.splitext(os.path.basename(filetemp))[0] if not track: track = '' if not artist: artist = '' if not album: album = '' #se il file esiste gia' in playlist salto (potrebbe esserci, anche rinominato) if glob("playlist/**/*|" + id + ".*"): print ('--- File già presente ---') return ("Err: %s [%s] già presente" %(title, id)) os.makedirs("playlist/" + user, exist_ok=True) #qui compone il nome del file fnam = (artist + " - " + track).strip() + "|" + title + "|" + id + ".m4a" if SHUFFLESONGS: fileout = str(random.randrange(10**6)).zfill(14) + "|" + fnam else: fileout = time.strftime("%Y%m%d%H%M%S") + "|" + fnam fileout = os.path.join("playlist/" + user, fileout) #se il file esiste gia' in archive prendo quello if glob("archive/*|" + id + ".*"): print ('--- File in archivio ---') copyfile(glob("archive/*|" + id + ".*")[0], fileout) else: print ('--- Converto ---') print (fileout) trimaudio(filetemp, fileout) print ('--- Fine ---') if not os.path.isfile(fileout): return("Err: file non convertito") if ARCHIVE: os.makedirs("archive", exist_ok=True) if not glob("archive/*|" + id + ".*"): shutil.copy2(fileout, "archive") #cerca la posizione del pezzo appena inserito pos = getposition(fileout) return ("OK: %s [%s] aggiunto alla playlist in posizione #%s" %(title, id, pos)) def normalizetext(s): if s is None: return None else: s = re.sub(r'[\\|/|:|*|?|"|<|>|\|]',r'',s) s = " ".join(s.split()) return s def listplaylist(): pl = [] pl2 = [] for udir in sorted(glob("playlist/*/")): #print (udir) user = os.path.basename(os.path.dirname(udir)) #cerca il file last last = "" if os.path.exists(udir + "/last"): f = open(udir + "/last", "r") last = f.readline().rstrip() else: files = [x for x in sorted(glob(udir + "/*")) if not os.path.basename(x) == "last"] if files: last = os.path.basename(files[0]).split("|")[0] #print ("LAST: " + last) #leggi i file nella cartella files = [x for x in sorted(glob(udir + "/*")) if not os.path.basename(x) == "last"] seq = 0 for file in files: bn = os.path.splitext(os.path.basename(file))[0] #print ("BASENAME: " + bn) seq = seq + 1 dat = bn.split("|")[0] if len(bn.split("|")) < 4: nam = bn.split("|")[1] cod = bn.split("|")[2] else: nam = bn.split("|")[2] cod = bn.split("|")[3] key = "-".join([str(seq).zfill(5), last, dat]) #print ("KEY: " + key) plsong = [key, file, user, nam, cod] pl.append(plsong) pl.sort() #rimuove la prima colonna, che serve solo per l'ordinamento pl2 = [x[1:] for x in pl] #print (pl) #print ('\n'.join([", ".join(x) for x in pl])) #print ('\n'.join([x[0] for x in pl])) return pl2 def listfallback(): pl = [] pl2 = [] #leggi i file nella cartella files = [x for x in sorted(glob("fallback/*")) if not os.path.basename(x) == "last"] seq = 0 for file in files: bn = os.path.splitext(os.path.basename(file))[0] seq = seq + 1 dat = bn.split("|")[0] if len(bn.split("|")) < 4: nam = bn.split("|")[1] cod = bn.split("|")[2] else: nam = bn.split("|")[2] cod = bn.split("|")[3] key = "-".join([str(seq).zfill(5), dat]) plsong = [key, file, "fallback", nam, cod] pl.append(plsong) pl.sort() #rimuove la prima colonna, che serve solo per l'ordinamento pl2 = [x[1:] for x in pl] return pl2 def playlocal(): if SHUFFLEUSERS: shuffleusers() if SHUFFLEFALLBACK: shufflefallback() while True: plt = listtot(1) if plt: song = plt[0][0] print(song) #qui fa play subprocess.call(["mplayer", "-nolirc", "-msglevel", "all=0:statusline=5", song]) consume(song) def clean(): #cancella tutto dalla playlist shutil.rmtree("playlist") os.makedirs("playlist") def shuffleusers(): #scrivere un numero casuale dentro a tutti i file last for udir in sorted(glob("playlist/*/")): #print (udir) with open(udir + "/last", "w") as f: f.write(str(random.randrange(10**6)).zfill(14)) def shufflefallback(): #rinominare con un numero casuale i file in fallback files = [x for x in glob("fallback/*") if not os.path.basename(x) == "last"] for file in files: fname = str(random.randrange(10**6)).zfill(14) + "|" + "|".join(os.path.basename(file).split("|")[1:]) fname = os.path.dirname(file) + "/" + fname os.rename(file, fname) def getposition(file): pl = listplaylist() try: return([x[0] for x in pl].index(file) + 1) except: pass async def telegram_msg_parser(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None: print("Messaggio ricevuto") msg = update.message.text id = str(update.message.from_user.id) username = update.message.from_user.username urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', msg) user = "t_" + "-".join([i for i in [id, username] if i]) #print (urls) #print (user) if not urls: await update.message.reply_text("Non ho trovato indirizzi validi...") return() await update.message.reply_text("Messaggio ricevuto. Elaboro...") for url in urls: #update.message.reply_text("Scarico %s" %(url)) # start the download dl = addurl(url, user) await update.message.reply_text(dl) def telegram_bot(): print ("Bot avviato") application = Application.builder().token(TELEGRAM_TOKEN).build() # on non command i.e message - echo the message on Telegram application.add_handler(MessageHandler(filters.TEXT, telegram_msg_parser)) # Run the bot until the user presses Ctrl-C application.run_polling(allowed_updates=Update.ALL_TYPES) class MastodonListener(StreamListener): # andiamo a definire il metodo __init__, che prende una istanza di Mastodon come parametro opzionale e lo setta nella prop. self.mastodon def __init__(self, mastodonInstance=None): self.mastodon = mastodonInstance def on_notification(self, notification): print("Messaggio ricevuto") #try: msg = notification["status"]["content"] id = str(notification["account"]["id"]) username = notification["account"]["acct"] #except KeyError: # return msg = msg.replace("
", "\n") msg = re.compile(r'<.*?>').sub('', msg) urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', msg) user = "m_" + "-".join([i for i in [id, username] if i]) #print (urls) #print (user) #visibility = notification['status']['visibility'] statusid = notification['status']['id'] if not urls: self.mastodon.status_post("@" + username + " " + "Non ho trovato indirizzi validi...", in_reply_to_id = statusid, visibility="direct") return() self.mastodon.status_post("@" + username + " " + "Messaggio ricevuto. Elaboro...", in_reply_to_id = statusid, visibility="direct") for url in urls: # start the download dl = addurl(url, user) self.mastodon.status_post("@" + username + " " + dl, in_reply_to_id = statusid, visibility="direct") def mastodon_bot(): print ("Bot avviato") mastodon = Mastodon(access_token = MASTODON_TOKEN, api_base_url = MASTODON_URL) listener = MastodonListener(mastodon) mastodon.stream_user(listener) def listtot(res = sys.maxsize): plt = listplaylist() if plt: announcepos = 0 else: announcepos = getlastannounce() if len(plt) < res: for x in listfallback(): if announcepos == 0: if os.path.exists("announce/repeat.mp3"): plt.append(["announce/repeat.mp3"]) announcepos = ANNOUNCEREPEAT - 1 else: announcepos = (announcepos - 1) plt.append(x) return plt[:res] else: return plt def getlastannounce(): announcepos = ANNOUNCEREPEAT - 1 try: with open("announce/last","r") as f: announcepos=int(f.readline().rstrip()) except: pass return announcepos def setlastannounce(announcepos): with open("announce/last","w") as f: f.write(str(announcepos)) def consume(song): if os.path.exists(song): print ("Consumo la canzone " + song) if song.split("/")[0] == "playlist": os.remove(song) if not [x for x in glob(os.path.dirname(song) + "/*") if not os.path.basename(x) == "last"]: shutil.rmtree(os.path.dirname(song)) else: with open(os.path.dirname(song) + "/last", "w") as f: f.write(time.strftime("%Y%m%d%H%M%S")) #resetta il contatore announcelast setlastannounce(0) elif song.split("/")[0] == "fallback": fname = time.strftime("%Y%m%d%H%M%S") + "|" + "|".join(os.path.basename(song).split("|")[1:]) fname = os.path.dirname(song) + "/" + fname os.rename(song, fname) announcepos = getlastannounce() print("Annuncio da " + str(announcepos) + " a " + str(announcepos - 1)) setlastannounce(announcepos - 1) elif song.split("/")[0] == "announce": setlastannounce(ANNOUNCEREPEAT) def addstartannounce(): #aggiunge l'annuncio iniziale if os.path.exists("announce/start.mp3"): fileout = "playlist/announce/00000000000000|start|start.mp3" copyfile("announce/start.mp3", fileout) def copyfile(source, dest): if not os.path.exists(dest): os.makedirs(os.path.dirname(dest), exist_ok=True) shutil.copy2(source, dest) def plaympd(): client = MPDClient() client.timeout = 10 # network timeout in seconds (floats allowed), default: None client.idletimeout = None # timeout for fetching the result of the idle command is handled seperately, default: None client.connect("localhost", 6600) # connect to localhost:6600 #print(client.mpd_version) looptime = 10 synctime = 5 listlen = 10 if client.status()['state'] != "play": if SHUFFLEUSERS: shuffleusers() if SHUFFLEFALLBACK: shufflefallback() #stoppa e svuota client.stop() client.clear() #cancella la cartella mpd if os.path.exists("mpd"): shutil.rmtree("mpd") os.makedirs("mpd") client.update() while 'updating_db' in client.status(): pass #aggiunge l'annuncio iniziale #addstartannounce() #resetta il lastannounce setlastannounce(ANNOUNCEREPEAT - 1) #riempe la playlist plt = listtot(listlen) for f in plt: print(f[0]) copyfile(f[0], "mpd/" + f[0]) client.update() while 'updating_db' in client.status(): pass for f in plt: client.add(f[0]) #consuma il primo e fa play consume(plt[0][0]) #mpdadd(client, listlen) client.play(0) while True: # print("Current") # print(client.currentsong()) # print() # print("Status") # print(client.status()) # print() # print("playlist") # print(client.playlistinfo()) # print() #controlla se il pezzo e' il primo e consuma le precedenti status = client.status() if int(status['song']) > 0: #consuma la canzone attuale song = client.playlistinfo()[int(status['song'])]['file'] consume(song) mpdclean(client) # if len(client.playlistinfo()) < listlen: # mpdsync(client, listlen) # status = client.status() #controlla se mancano meno di 15 secondi #timeleft = float(status['duration']) - float(status['elapsed']) #new mpd timeleft = float(client.currentsong()['time']) - float(status['elapsed']) #old mpd if timeleft <= looptime + synctime: time.sleep(max(timeleft - synctime, 0)) print ("Mancano %d secondi" % (synctime)) mpdclean(client) mpdadd(client, listlen) time.sleep(looptime) def mpdclean(client): #cancella le precedenti for x in range(int(client.status()['song'])): song = client.playlistinfo()[0]['file'] consume(song) client.delete(0) #e pulisce anche in mpd if os.path.exists("mpd/" + song): os.remove("mpd/" + song) #se non ci sono + file cancella la cartella if not glob(os.path.dirname("mpd/" + song) + "/*"): shutil.rmtree(os.path.dirname("mpd/" + song)) def mpdadd(client, listlen): print("Rigenero la playlist") plt = listtot(listlen) #copia i file for f in plt: #print(f[0]) copyfile(f[0], "mpd/" + f[0]) client.update() while 'updating_db' in client.status(): pass # #cancella tutto tranne la prima # for x in client.playlistinfo()[1:]: # client.delete(1) # time.sleep(0.5) # #e rifa la playlist # for f in plt: # client.add(f[0]) # time.sleep(0.5) print("------------------") playlist=client.playlistinfo() for f in plt[:len(playlist)-1]: i = plt.index(f) + 1 #print(f[0] +" - "+ playlist[i]['file']) if f[0] != playlist[i]['file']: i = i - 1 break else: print("Mantengo " + f[0]) i = i + 1 #print (i) for x in client.playlistinfo()[i:]: print("Cancello " + x['file']) client.delete(i) #e rifa la playlist for f in plt[i-1:]: print("Aggiungo " + f[0]) client.add(f[0]) #consumo la prima consume(plt[0][0]) def trimaudio(fin, fout): from pydub import AudioSegment from pydub.silence import split_on_silence maxlen = 240 tolerance = 15 fade = 10 audio = AudioSegment.from_file(fin, "m4a") #print(audio.duration_seconds) def match_target_amplitude(sound, target_dBFS): change_in_dBFS = target_dBFS - sound.dBFS return sound.apply_gain(change_in_dBFS) #trim audio = split_on_silence(audio, min_silence_len=3000, silence_thresh=-70.0, seek_step=100)[0] #print(audio.duration_seconds) #fade if (audio.duration_seconds > maxlen + tolerance): audio = audio[:maxlen*1000].fade_out(fade*1000) #print(audio.duration_seconds) #normalize audio = match_target_amplitude(audio, -20.0) audio.export(fout, format="mp4") if __name__ == '__main__': parser = argparse.ArgumentParser('Playlistalo') subparsers = parser.add_subparsers(dest='command') parser_add = subparsers.add_parser('add', help='Add song from file') parser_add.add_argument('file', help='Song file') parser_add.add_argument('-u', '--user', help='User that sends the file', default=None) parser_addurl = subparsers.add_parser('addurl', help='Add song from url') parser_addurl.add_argument('url', help='Song url') parser_addurl.add_argument('-u', '--user', help='User that sends the file', default=None) parser_addurl = subparsers.add_parser('list', help='List curent playlist') parser_addurl = subparsers.add_parser('mastodon', help='Run Mastodon bot') parser_addurl = subparsers.add_parser('telegram', help='Run Telegram bot') parser_addurl = subparsers.add_parser('playlocal', help='Play songs locally') parser_addurl = subparsers.add_parser('plaympd', help='Play songs using MPD') args = parser.parse_args() if args.command == 'add': print(add(args.file, args.user)) elif args.command == 'addurl': print(addurl(args.url, args.user)) elif args.command == 'list': pl = listtot() #print ('\n'.join([", ".join(x) for x in pl])) print ('\n'.join([x[0] for x in pl])) elif args.command == 'mastodon': mastodon_bot() elif args.command == 'telegram': telegram_bot() elif args.command == 'playlocal': playlocal() elif args.command == 'plaympd': plaympd() else: parser.print_help()