playlistalo/playlistalo.py
2023-09-18 21:32:26 +02:00

737 lines
22 KiB
Python
Executable file

#!/usr/bin/env python3
#Playlistalo - simpatico script che legge le cartelle e genera la playlist
#Requirements: youtube_dl Mastodon.py python-telegram-bot validators python-musicpd
#import youtube_dl
import yt_dlp as youtube_dl
import shutil
import sys
import re
import os
import validators
from glob import glob
import json
import time
import subprocess
import random
import configparser
import hashlib
from urllib.parse import urlparse, parse_qs
import argparse
from musicpd import MPDClient
from telegram import Update
from telegram.ext import Application, ContextTypes, MessageHandler, filters
from mastodon import Mastodon, StreamListener
scriptpath = os.path.dirname(os.path.realpath(__file__))
#Crea le cartelle
os.makedirs("playlist", exist_ok=True)
os.makedirs("fallback", exist_ok=True)
SHUFFLEUSERS = False
SHUFFLESONGS = False
SHUFFLEFALLBACK = False
ARCHIVE = True
TELEGRAM_TOKEN = ""
MASTODON_TOKEN = ""
MASTODON_URL = ""
ANNOUNCEREPEAT = 5
#Scrivi la prima configurazione
configfile = 'playlistalo.conf'
if not os.path.exists(configfile):
config = configparser.ConfigParser()
config['playlistalo'] = {'ShuffleUsers': SHUFFLEUSERS, 'ShuffleSongs': SHUFFLESONGS, 'ShuffleFallback': SHUFFLEFALLBACK, 'Archive': ARCHIVE, 'Telegram_token': TELEGRAM_TOKEN, 'Mastodon_token': MASTODON_TOKEN, 'Mastodon_url': MASTODON_URL}
with open(configfile, 'w') as f:
config.write(f)
#Leggi la configurazione
config = configparser.ConfigParser()
config.read(configfile)
playlistaloconf = config['playlistalo']
SHUFFLEUSERS = playlistaloconf.getboolean('ShuffleUsers', SHUFFLEUSERS)
SHUFFLESONGS = playlistaloconf.getboolean('ShuffleSongs', SHUFFLESONGS)
SHUFFLEFALLBACK = playlistaloconf.getboolean('ShuffleFallback', SHUFFLEFALLBACK)
ARCHIVE = playlistaloconf.getboolean('Archive', ARCHIVE)
TELEGRAM_TOKEN = playlistaloconf.get('Telegram_token', TELEGRAM_TOKEN)
MASTODON_TOKEN = playlistaloconf.get('Mastodon_token',MASTODON_TOKEN)
MASTODON_URL = playlistaloconf.get('Mastodon_url', MASTODON_URL)
def video_id(value):
"""
Examples:
- http://youtu.be/SA2iWivDJiE
- http://www.youtube.com/watch?v=_oPAwA_Udwc&feature=feedu
- http://www.youtube.com/embed/SA2iWivDJiE
- http://www.youtube.com/v/SA2iWivDJiE?version=3&hl=en_US
"""
query = urlparse(value)
if query.hostname == 'youtu.be':
return query.path[1:]
if query.hostname in ('www.youtube.com', 'm.youtube.com', 'youtube.com'):
if query.path == '/watch':
p = parse_qs(query.query)
return p['v'][0]
if query.path[:7] == '/embed/':
return query.path.split('/')[2]
if query.path[:3] == '/v/':
return query.path.split('/')[2]
# fail?
return None
def addurl(url, user = "-unknown-"):
#print ('--- Inizio ---')
os.makedirs("cache", exist_ok=True)
ydl_opts = {
'format': 'bestaudio[ext=m4a]',
'outtmpl': 'cache/%(id)s.m4a',
'no-cache-dir': True,
'noplaylist': True,
'quiet': True,
}
url = url.strip()
print ("url: " + url)
print ("user: " + user)
if not validators.url(url):
print ('--- URL malformato ---')
return ("Err: url non valido")
#trova l'id dall'url
id = video_id(url)
if id:
#cerca se ho gia' il json e il file
if not (glob(os.path.join("cache", id + ".json")) and glob(os.path.join("cache", id + ".m4a"))):
id = None
else:
#legge le info
with open(os.path.join("cache", id + ".json")) as infile:
j = json.load(infile)
title = normalizetext(j.get('title'))
track = normalizetext(j.get('track'))
artist = normalizetext(j.get('artist'))
album = normalizetext(j.get('album'))
filetemp = os.path.join("cache", id + ".m4a")
print ('id: %s' %(id))
print ('title: %s' %(title))
if not id:
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
try:
meta = ydl.extract_info(url, download = False)
except youtube_dl.DownloadError as detail:
print ('--- Errore video non disponibile ---')
print(str(detail))
return ("Err: " + str(detail))
id = meta.get('id').strip()
title = normalizetext(meta.get('title'))
track = normalizetext(meta.get('track'))
artist = normalizetext(meta.get('artist'))
album = normalizetext(meta.get('album'))
print ('id: %s' %(id))
print ('title: %s' %(title))
#scrivo il json
with open(os.path.join("cache", id + ".json"), 'w') as outfile:
json.dump(meta, outfile, indent=4)
#ho letto le info, ora controllo se il file esiste altrimenti lo scarico
#miglioria: controllare se upload_date e' uguale a quella del json gia' esistente
filetemp = os.path.join("cache", id + ".m4a")
if not glob(filetemp):
print ('--- Scarico ---')
ydl.download([url]) #non ho capito perche' ma senza [] fa un carattere per volta
if not os.path.isfile(filetemp):
return("Err: file non scaricato")
return(add(filetemp, user, title, id, track, artist, album))
def md5(fname):
hash_md5 = hashlib.md5()
with open(fname, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
hash_md5.update(chunk)
return hash_md5.hexdigest()
def add(filetemp, user = "-unknown-", title = None, id = None, track = None, artist = None, album = None):
if not id:
id = md5(filetemp)
if not title:
title = os.path.splitext(os.path.basename(filetemp))[0]
if not track:
track = ''
if not artist:
artist = ''
if not album:
album = ''
#se il file esiste gia' in playlist salto (potrebbe esserci, anche rinominato)
if glob("playlist/**/*|" + id + ".*"):
print ('--- File già presente ---')
return ("Err: %s [%s] già presente" %(title, id))
os.makedirs("playlist/" + user, exist_ok=True)
#qui compone il nome del file
fnam = (artist + " - " + track).strip() + "|" + title + "|" + id + ".m4a"
if SHUFFLESONGS:
fileout = str(random.randrange(10**6)).zfill(14) + "|" + fnam
else:
fileout = time.strftime("%Y%m%d%H%M%S") + "|" + fnam
fileout = os.path.join("playlist/" + user, fileout)
#se il file esiste gia' in archive prendo quello
if glob("archive/*|" + id + ".*"):
print ('--- File in archivio ---')
copyfile(glob("archive/*|" + id + ".*")[0], fileout)
else:
print ('--- Converto ---')
print (fileout)
trimaudio(filetemp, fileout)
print ('--- Fine ---')
if not os.path.isfile(fileout):
return("Err: file non convertito")
if ARCHIVE:
os.makedirs("archive", exist_ok=True)
if not glob("archive/*|" + id + ".*"):
shutil.copy2(fileout, "archive")
#cerca la posizione del pezzo appena inserito
pos = getposition(fileout)
return ("OK: %s [%s] aggiunto alla playlist in posizione #%s" %(title, id, pos))
def normalizetext(s):
if s is None:
return None
else:
s = re.sub(r'[\\|/|:|*|?|"|<|>|\|]',r'',s)
s = " ".join(s.split())
return s
def listplaylist():
pl = []
pl2 = []
for udir in sorted(glob("playlist/*/")):
#print (udir)
user = os.path.basename(os.path.dirname(udir))
#cerca il file last
last = ""
if os.path.exists(udir + "/last"):
f = open(udir + "/last", "r")
last = f.readline().rstrip()
else:
files = [x for x in sorted(glob(udir + "/*")) if not os.path.basename(x) == "last"]
if files:
last = os.path.basename(files[0]).split("|")[0]
#print ("LAST: " + last)
#leggi i file nella cartella
files = [x for x in sorted(glob(udir + "/*")) if not os.path.basename(x) == "last"]
seq = 0
for file in files:
bn = os.path.splitext(os.path.basename(file))[0]
#print ("BASENAME: " + bn)
seq = seq + 1
dat = bn.split("|")[0]
if len(bn.split("|")) < 4:
nam = bn.split("|")[1]
cod = bn.split("|")[2]
else:
nam = bn.split("|")[2]
cod = bn.split("|")[3]
key = "-".join([str(seq).zfill(5), last, dat])
#print ("KEY: " + key)
plsong = [key, file, user, nam, cod]
pl.append(plsong)
pl.sort()
#rimuove la prima colonna, che serve solo per l'ordinamento
pl2 = [x[1:] for x in pl]
#print (pl)
#print ('\n'.join([", ".join(x) for x in pl]))
#print ('\n'.join([x[0] for x in pl]))
return pl2
def listfallback():
pl = []
pl2 = []
#leggi i file nella cartella
files = [x for x in sorted(glob("fallback/*")) if not os.path.basename(x) == "last"]
seq = 0
for file in files:
bn = os.path.splitext(os.path.basename(file))[0]
seq = seq + 1
dat = bn.split("|")[0]
if len(bn.split("|")) < 4:
nam = bn.split("|")[1]
cod = bn.split("|")[2]
else:
nam = bn.split("|")[2]
cod = bn.split("|")[3]
key = "-".join([str(seq).zfill(5), dat])
plsong = [key, file, "fallback", nam, cod]
pl.append(plsong)
pl.sort()
#rimuove la prima colonna, che serve solo per l'ordinamento
pl2 = [x[1:] for x in pl]
return pl2
def playlocal():
if SHUFFLEUSERS:
shuffleusers()
if SHUFFLEFALLBACK:
shufflefallback()
while True:
plt = listtot(1)
if plt:
song = plt[0][0]
print(song)
#qui fa play
subprocess.call(["mplayer", "-nolirc", "-msglevel", "all=0:statusline=5", song])
consume(song)
def clean():
#cancella tutto dalla playlist
shutil.rmtree("playlist")
os.makedirs("playlist")
def shuffleusers():
#scrivere un numero casuale dentro a tutti i file last
for udir in sorted(glob("playlist/*/")):
#print (udir)
with open(udir + "/last", "w") as f:
f.write(str(random.randrange(10**6)).zfill(14))
def shufflefallback():
#rinominare con un numero casuale i file in fallback
files = [x for x in glob("fallback/*") if not os.path.basename(x) == "last"]
for file in files:
fname = str(random.randrange(10**6)).zfill(14) + "|" + "|".join(os.path.basename(file).split("|")[1:])
fname = os.path.dirname(file) + "/" + fname
os.rename(file, fname)
def getposition(file):
pl = listplaylist()
try:
return([x[0] for x in pl].index(file) + 1)
except:
pass
async def telegram_msg_parser(update: Update, context: ContextTypes.DEFAULT_TYPE) -> None:
print("Messaggio ricevuto")
msg = update.message.text
id = str(update.message.from_user.id)
username = update.message.from_user.username
urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', msg)
user = "t_" + "-".join([i for i in [id, username] if i])
#print (urls)
#print (user)
if not urls:
await update.message.reply_text("Non ho trovato indirizzi validi...")
return()
await update.message.reply_text("Messaggio ricevuto. Elaboro...")
for url in urls:
#update.message.reply_text("Scarico %s" %(url))
# start the download
dl = addurl(url, user)
await update.message.reply_text(dl)
def telegram_bot():
print ("Bot avviato")
application = Application.builder().token(TELEGRAM_TOKEN).build()
# on non command i.e message - echo the message on Telegram
application.add_handler(MessageHandler(filters.TEXT, telegram_msg_parser))
# Run the bot until the user presses Ctrl-C
application.run_polling(allowed_updates=Update.ALL_TYPES)
class MastodonListener(StreamListener):
# andiamo a definire il metodo __init__, che prende una istanza di Mastodon come parametro opzionale e lo setta nella prop. self.mastodon
def __init__(self, mastodonInstance=None):
self.mastodon = mastodonInstance
def on_notification(self, notification):
print("Messaggio ricevuto")
#try:
msg = notification["status"]["content"]
id = str(notification["account"]["id"])
username = notification["account"]["acct"]
#except KeyError:
# return
msg = msg.replace("<br>", "\n")
msg = re.compile(r'<.*?>').sub('', msg)
urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', msg)
user = "m_" + "-".join([i for i in [id, username] if i])
#print (urls)
#print (user)
#visibility = notification['status']['visibility']
statusid = notification['status']['id']
if not urls:
self.mastodon.status_post("@" + username + " " + "Non ho trovato indirizzi validi...", in_reply_to_id = statusid, visibility="direct")
return()
self.mastodon.status_post("@" + username + " " + "Messaggio ricevuto. Elaboro...", in_reply_to_id = statusid, visibility="direct")
for url in urls:
# start the download
dl = addurl(url, user)
self.mastodon.status_post("@" + username + " " + dl, in_reply_to_id = statusid, visibility="direct")
def mastodon_bot():
print ("Bot avviato")
mastodon = Mastodon(access_token = MASTODON_TOKEN, api_base_url = MASTODON_URL)
listener = MastodonListener(mastodon)
mastodon.stream_user(listener)
def listtot(res = sys.maxsize):
plt = listplaylist()
if plt:
announcepos = 0
else:
announcepos = getlastannounce()
if len(plt) < res:
for x in listfallback():
if announcepos == 0:
if os.path.exists("announce/repeat.mp3"):
plt.append(["announce/repeat.mp3"])
announcepos = ANNOUNCEREPEAT - 1
else:
announcepos = (announcepos - 1)
plt.append(x)
return plt[:res]
else:
return plt
def getlastannounce():
announcepos = ANNOUNCEREPEAT - 1
try:
with open("announce/last","r") as f:
announcepos=int(f.readline().rstrip())
except:
pass
return announcepos
def setlastannounce(announcepos):
with open("announce/last","w") as f:
f.write(str(announcepos))
def consume(song):
if os.path.exists(song):
print ("Consumo la canzone " + song)
if song.split("/")[0] == "playlist":
os.remove(song)
if not [x for x in glob(os.path.dirname(song) + "/*") if not os.path.basename(x) == "last"]:
shutil.rmtree(os.path.dirname(song))
else:
with open(os.path.dirname(song) + "/last", "w") as f:
f.write(time.strftime("%Y%m%d%H%M%S"))
#resetta il contatore announcelast
setlastannounce(0)
elif song.split("/")[0] == "fallback":
fname = time.strftime("%Y%m%d%H%M%S") + "|" + "|".join(os.path.basename(song).split("|")[1:])
fname = os.path.dirname(song) + "/" + fname
os.rename(song, fname)
announcepos = getlastannounce()
print("Annuncio da " + str(announcepos) + " a " + str(announcepos - 1))
setlastannounce(announcepos - 1)
elif song.split("/")[0] == "announce":
setlastannounce(ANNOUNCEREPEAT)
def addstartannounce():
#aggiunge l'annuncio iniziale
if os.path.exists("announce/start.mp3"):
fileout = "playlist/announce/00000000000000|start|start.mp3"
copyfile("announce/start.mp3", fileout)
def copyfile(source, dest):
if not os.path.exists(dest):
os.makedirs(os.path.dirname(dest), exist_ok=True)
shutil.copy2(source, dest)
def plaympd():
client = MPDClient()
client.timeout = 10 # network timeout in seconds (floats allowed), default: None
client.idletimeout = None # timeout for fetching the result of the idle command is handled seperately, default: None
client.connect("localhost", 6600) # connect to localhost:6600
#print(client.mpd_version)
looptime = 10
synctime = 5
listlen = 10
if client.status()['state'] != "play":
if SHUFFLEUSERS:
shuffleusers()
if SHUFFLEFALLBACK:
shufflefallback()
#stoppa e svuota
client.stop()
client.clear()
#cancella la cartella mpd
if os.path.exists("mpd"):
shutil.rmtree("mpd")
os.makedirs("mpd")
client.update()
while 'updating_db' in client.status():
pass
#aggiunge l'annuncio iniziale
#addstartannounce()
#resetta il lastannounce
setlastannounce(ANNOUNCEREPEAT - 1)
#riempe la playlist
plt = listtot(listlen)
for f in plt:
print(f[0])
copyfile(f[0], "mpd/" + f[0])
client.update()
while 'updating_db' in client.status():
pass
for f in plt:
client.add(f[0])
#consuma il primo e fa play
consume(plt[0][0])
#mpdadd(client, listlen)
client.play(0)
while True:
# print("Current")
# print(client.currentsong())
# print()
# print("Status")
# print(client.status())
# print()
# print("playlist")
# print(client.playlistinfo())
# print()
#controlla se il pezzo e' il primo e consuma le precedenti
status = client.status()
if int(status['song']) > 0:
#consuma la canzone attuale
song = client.playlistinfo()[int(status['song'])]['file']
consume(song)
mpdclean(client)
# if len(client.playlistinfo()) < listlen:
# mpdsync(client, listlen)
# status = client.status()
#controlla se mancano meno di 15 secondi
#timeleft = float(status['duration']) - float(status['elapsed']) #new mpd
timeleft = float(client.currentsong()['time']) - float(status['elapsed']) #old mpd
if timeleft <= looptime + synctime:
time.sleep(max(timeleft - synctime, 0))
print ("Mancano %d secondi" % (synctime))
mpdclean(client)
mpdadd(client, listlen)
time.sleep(looptime)
def mpdclean(client):
#cancella le precedenti
for x in range(int(client.status()['song'])):
song = client.playlistinfo()[0]['file']
consume(song)
client.delete(0)
#e pulisce anche in mpd
if os.path.exists("mpd/" + song):
os.remove("mpd/" + song)
#se non ci sono + file cancella la cartella
if not glob(os.path.dirname("mpd/" + song) + "/*"):
shutil.rmtree(os.path.dirname("mpd/" + song))
def mpdadd(client, listlen):
print("Rigenero la playlist")
plt = listtot(listlen)
#copia i file
for f in plt:
#print(f[0])
copyfile(f[0], "mpd/" + f[0])
client.update()
while 'updating_db' in client.status():
pass
# #cancella tutto tranne la prima
# for x in client.playlistinfo()[1:]:
# client.delete(1)
# time.sleep(0.5)
# #e rifa la playlist
# for f in plt:
# client.add(f[0])
# time.sleep(0.5)
print("------------------")
playlist=client.playlistinfo()
for f in plt[:len(playlist)-1]:
i = plt.index(f) + 1
#print(f[0] +" - "+ playlist[i]['file'])
if f[0] != playlist[i]['file']:
i = i - 1
break
else:
print("Mantengo " + f[0])
i = i + 1
#print (i)
for x in client.playlistinfo()[i:]:
print("Cancello " + x['file'])
client.delete(i)
#e rifa la playlist
for f in plt[i-1:]:
print("Aggiungo " + f[0])
client.add(f[0])
#consumo la prima
consume(plt[0][0])
def trimaudio(fin, fout):
from pydub import AudioSegment
from pydub.silence import split_on_silence
maxlen = 240
tolerance = 15
fade = 10
audio = AudioSegment.from_file(fin, "m4a")
#print(audio.duration_seconds)
def match_target_amplitude(sound, target_dBFS):
change_in_dBFS = target_dBFS - sound.dBFS
return sound.apply_gain(change_in_dBFS)
#trim
audio = split_on_silence(audio, min_silence_len=3000, silence_thresh=-70.0, seek_step=100)[0]
#print(audio.duration_seconds)
#fade
if (audio.duration_seconds > maxlen + tolerance):
audio = audio[:maxlen*1000].fade_out(fade*1000)
#print(audio.duration_seconds)
#normalize
audio = match_target_amplitude(audio, -20.0)
audio.export(fout, format="mp4")
if __name__ == '__main__':
parser = argparse.ArgumentParser('Playlistalo')
subparsers = parser.add_subparsers(dest='command')
parser_add = subparsers.add_parser('add', help='Add song from file')
parser_add.add_argument('file', help='Song file')
parser_add.add_argument('-u', '--user', help='User that sends the file', default=None)
parser_addurl = subparsers.add_parser('addurl', help='Add song from url')
parser_addurl.add_argument('url', help='Song url')
parser_addurl.add_argument('-u', '--user', help='User that sends the file', default=None)
parser_addurl = subparsers.add_parser('list', help='List curent playlist')
parser_addurl = subparsers.add_parser('mastodon', help='Run Mastodon bot')
parser_addurl = subparsers.add_parser('telegram', help='Run Telegram bot')
parser_addurl = subparsers.add_parser('playlocal', help='Play songs locally')
parser_addurl = subparsers.add_parser('plaympd', help='Play songs using MPD')
args = parser.parse_args()
if args.command == 'add':
print(add(args.file, args.user))
elif args.command == 'addurl':
print(addurl(args.url, args.user))
elif args.command == 'list':
pl = listtot()
#print ('\n'.join([", ".join(x) for x in pl]))
print ('\n'.join([x[0] for x in pl]))
elif args.command == 'mastodon':
mastodon_bot()
elif args.command == 'telegram':
telegram_bot()
elif args.command == 'playlocal':
playlocal()
elif args.command == 'plaympd':
plaympd()
else:
parser.print_help()