playlistalo/playlistalo.py
2020-04-17 19:38:54 +02:00

359 lines
11 KiB
Python
Executable file

#!/usr/bin/env python3
#Playlistalo - simpatico script che legge le cartelle e genera la playlist
import youtube_dl
import shutil
import sys
import re
import os
import validators
from glob import glob
import json
import time
import subprocess
import random
import configparser
from musicpd import MPDClient
from telegram.ext import Updater, MessageHandler, Filters
from mastodon import Mastodon, StreamListener
scriptpath = os.path.dirname(os.path.realpath(__file__))
#Crea le cartelle
if not os.path.exists("playlist"):
os.makedirs("playlist")
if not os.path.exists("cache"):
os.makedirs("cache")
if not os.path.exists("fallback"):
os.makedirs("fallback")
if not os.path.exists("archive"):
os.makedirs("archive")
#Scrivi la prima configurazione
configfile = 'playlistalo.conf'
if not os.path.exists(configfile):
config = configparser.ConfigParser()
config['playlistalo'] = {'ShuffleUsers': False, 'ShuffleSongs': False, 'ShuffleFallback': False, 'Telegram_token': "", 'Mastodon_token': "", 'Mastodon_url': ""}
with open(configfile, 'w') as f:
config.write(f)
#Leggi la configurazione
config = configparser.ConfigParser()
config.read(configfile)
playlistaloconf = config['playlistalo']
SHUFFLEUSERS = playlistaloconf.getboolean('ShuffleUsers')
SHUFFLESONGS = playlistaloconf.getboolean('ShuffleSongs')
SHUFFLEFALLBACK = playlistaloconf.getboolean('ShuffleFallback')
TELEGRAM_TOKEN = playlistaloconf.get('Telegram_token')
MASTODON_TOKEN = playlistaloconf.get('Mastodon_token')
MASTODON_URL = playlistaloconf.get('Mastodon_url')
def add(url, user = "-unknown-"):
#print ('--- Inizio ---')
ydl_opts = {
'format': 'bestaudio[ext=m4a]',
'outtmpl': 'cache/%(id)s.m4a',
'noplaylist': True,
'quiet': True,
}
url = url.strip()
print ("url: " + url)
print ("user: " + user)
if not validators.url(url):
print ('--- URL malformato ---')
return ("Err: url non valido")
with youtube_dl.YoutubeDL(ydl_opts) as ydl:
try:
meta = ydl.extract_info(url, download = False)
except youtube_dl.DownloadError as detail:
print ('--- Errore video non disponibile ---')
print(str(detail))
return ("Err: " + str(detail))
id = meta.get('id').strip()
title = __normalizetext(meta.get('title'))
print ('id: %s' %(id))
print ('title: %s' %(title))
#scrivo il json
with open(os.path.join("cache", id + ".json"), 'w') as outfile:
json.dump(meta, outfile, indent=4)
#ho letto le info, ora controllo se il file esiste altrimenti lo scarico
#miglioria: controllare se upload_date e' uguale a quella del json gia' esistente
filetemp = os.path.join("cache", id + ".m4a")
if not glob(filetemp):
print ('--- Scarico ---')
ydl.download([url]) #non ho capito perche' ma senza [] fa un carattere per volta
if not os.path.isfile(filetemp):
return("Err: file non scaricato")
#se il file esiste gia' in playlist salto (potrebbe esserci, anche rinominato)
if glob("playlist/**/*|" + id + ".*"):
print ('--- File già presente ---')
return ("Err: %s [%s] già presente" %(title, id))
if not os.path.exists("playlist/" + user):
os.makedirs("playlist/" + user)
#qui compone il nome del file
if SHUFFLESONGS:
fileout = str(random.randrange(10**6)).zfill(14) + "|" + title + "|" + id + ".m4a"
else:
fileout = time.strftime("%Y%m%d%H%M%S") + "|" + title + "|" + id + ".m4a"
fileout = os.path.join("playlist/" + user, fileout)
print ('--- Converto ---')
print (fileout)
subprocess.call([scriptpath + "/trimaudio.sh", filetemp, fileout])
if not os.path.isfile(fileout):
return("Err: file non convertito")
#cerca la posizione del pezzo appena inserito
pos = getposition(fileout)
print ('--- Fine ---')
print ("")
return ("OK: %s [%s] aggiunto alla playlist in posizione #%s" %(title, id, pos))
def __normalizetext(s):
if s is None:
return None
else:
s = re.sub(r'[\\|/|:|*|?|"|<|>|\|]',r'',s)
s = " ".join(s.split())
return s
def listplaylist():
pl = []
pl2 = []
for udir in sorted(glob("playlist/*/")):
#print (udir)
user = os.path.basename(os.path.dirname(udir))
#cerca il file last
last = ""
if os.path.exists(udir + "/last"):
f = open(udir + "/last", "r")
last = f.readline().rstrip()
else:
last = os.path.basename(sorted(glob(udir + "/*.m4a"))[0]).split("|")[0]
#print ("LAST: " + last)
#leggi i file nella cartella
files = sorted(glob(udir + "/*.m4a"))
seq = 0
for file in files:
bn = os.path.splitext(os.path.basename(file))[0]
#print ("BASENAME: " + bn)
seq = seq + 1
dat = bn.split("|")[0]
nam = bn.split("|")[1]
cod = bn.split("|")[2]
key = "-".join([str(seq).zfill(5), last, dat])
#print ("KEY: " + key)
plsong = [key, file, user, nam, cod]
pl.append(plsong)
pl.sort()
#rimuove la prima colonna, che serve solo per l'ordinamento
pl2 = [x[1:] for x in pl]
#print (pl)
#print ('\n'.join([", ".join(x) for x in pl]))
#print ('\n'.join([x[0] for x in pl]))
return pl2
def listfallback():
pl = []
pl2 = []
#leggi i file nella cartella
files = sorted(glob("fallback/*.m4a"))
seq = 0
for file in files:
bn = os.path.splitext(os.path.basename(file))[0]
seq = seq + 1
dat = bn.split("|")[0]
nam = bn.split("|")[1]
cod = bn.split("|")[2]
key = "-".join([str(seq).zfill(5), dat])
plsong = [key, file, "fallback", nam, cod]
pl.append(plsong)
pl.sort()
#rimuove la prima colonna, che serve solo per l'ordinamento
pl2 = [x[1:] for x in pl]
return pl2
def playsingle():
pl = listplaylist()
#print ('\n'.join([x[0] for x in pl]))
if pl:
firstsong = pl[0][0]
print (firstsong)
#qui fa play
subprocess.call(["mplayer", "-nolirc", "-msglevel", "all=0:statusline=5", firstsong])
#alla fine consuma
os.rename(firstsong, "archive/" + os.path.basename(firstsong))
#se non ci sono + file cancella la cartella
if not glob(os.path.dirname(firstsong) + "/*.m4a"):
shutil.rmtree(os.path.dirname(firstsong))
else:
with open(os.path.dirname(firstsong) + "/last", "w") as f:
f.write(time.strftime("%Y%m%d%H%M%S"))
else:
#usa il fallback
#eventualmente aggiungere file di avviso con istruzioni veloci
plf = listfallback()
#print ('\n'.join([x[0] for x in plf]))
if plf:
firstsong = plf[0][0]
print (firstsong)
#qui fa play
subprocess.call(["mplayer", "-nolirc", "-msglevel", "all=0:statusline=5", firstsong])
#alla fine consuma
fname = time.strftime("%Y%m%d%H%M%S") + "|" + "|".join(os.path.basename(firstsong).split("|")[1:])
fname = os.path.dirname(firstsong) + "/" + fname
os.rename(firstsong, fname)
def playloop():
while True:
playsingle()
def clean():
#cancella tutto dalla playlist
shutil.rmtree("playlist")
os.makedirs("playlist")
def shuffleusers():
#scrivere un numero casuale dentro a tutti i file last
for udir in sorted(glob("playlist/*/")):
#print (udir)
with open(udir + "/last", "w") as f:
f.write(str(random.randrange(10**6)).zfill(14))
def shufflefallback():
#rinominare con un numero casuale i file in fallback
files = sorted(glob("fallback/*.m4a"))
for file in files:
fname = str(random.randrange(10**6)).zfill(14) + "|" + "|".join(os.path.basename(file).split("|")[1:])
fname = os.path.dirname(file) + "/" + fname
os.rename(file, fname)
def getposition(file):
pl = listplaylist()
try:
return([x[0] for x in pl].index(file) + 1)
except:
pass
def telegram_msg_parser(bot, update):
print("Messaggio ricevuto")
msg = update.message.text
id = str(update.message.from_user.id)
username = update.message.from_user.username
urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', msg)
user = "t_" + "-".join([i for i in [id, username] if i])
#print (urls)
#print (user)
if not urls:
update.message.reply_text("Non ho trovato indirizzi validi...")
return()
update.message.reply_text("Messaggio ricevuto. Elaboro...")
for url in urls:
#update.message.reply_text("Scarico %s" %(url))
# start the download
dl = add(url, user)
update.message.reply_text(dl)
def telegram_bot():
print ("Bot avviato")
# Create the EventHandler and pass it your bot's token.
updater = Updater(TELEGRAM_TOKEN)
# Get the dispatcher to register handlers
dp = updater.dispatcher
# parse message
dp.add_handler(MessageHandler(Filters.text, telegram_msg_parser))
# Start the Bot
updater.start_polling()
# Run the bot until you press Ctrl-C
updater.idle()
class MastodonListener(StreamListener):
# andiamo a definire il metodo __init__, che prende una istanza di Mastodon come parametro opzionale e lo setta nella prop. self.mastodon
def __init__(self, mastodonInstance=None):
self.mastodon = mastodonInstance
def on_notification(self, notification):
print("Messaggio ricevuto")
#try:
msg = notification["status"]["content"]
id = str(notification["account"]["id"])
username = notification["account"]["acct"]
#except KeyError:
# return
msg = msg.replace("<br>", "\n")
msg = re.compile(r'<.*?>').sub('', msg)
urls = re.findall('http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+', msg)
user = "m_" + "-".join([i for i in [id, username] if i])
#print (urls)
#print (user)
#visibility = notification['status']['visibility']
statusid = notification['status']['id']
if not urls:
self.mastodon.status_post("@" + username + " " + "Non ho trovato indirizzi validi...", in_reply_to_id = statusid, visibility="direct")
return()
self.mastodon.status_post("@" + username + " " + "Messaggio ricevuto. Elaboro...", in_reply_to_id = statusid, visibility="direct")
for url in urls:
# start the download
dl = add(url, user)
self.mastodon.status_post("@" + username + " " + dl, in_reply_to_id = statusid, visibility="direct")
def mastodon_bot():
print ("Bot avviato")
mastodon = Mastodon(access_token = MASTODON_TOKEN, api_base_url = MASTODON_URL)
listener = MastodonListener(mastodon)
mastodon.stream_user(listener)
if __name__ == '__main__':
print ("This is a package, use other commands to run it")