import os import sys import errno import argparse import configparser import logging import mysql.connector import subprocess # Query for IMAP/POP3 certificate mbox_list_stmt = "SELECT DISTINCT(name) FROM records WHERE content in ({}) and (name LIKE 'imap.%' or name LIKE 'pop3.%' or name LIKE 'mail.%')" # Query for SMTP certificate smtp_list_stmt = "SELECT DISTINCT(name) FROM records WHERE content in ({}) and (name LIKE 'smtp.%' or name LIKE 'mail.%')" # Get list of defined domains in vhosts configuration database domains_list_stmt = """SELECT DISTINCT(SUBSTRING_INDEX(urls.dns_name, '.', -2)) AS domain_names FROM hosts INNER JOIN (hosts_urls, urls, vhosts_features, vhosts) ON (urls.url_id = hosts_urls.url_id and urls.url_id = vhosts.url_id and vhosts.vhost_id = vhosts_features.vhost_id and hosts.host_id = hosts_urls.host_id) WHERE (hosts_urls.http = 'Y' and hosts.hostname = %(webserver)s) """ # Get domain_id if defined in nameserver database domain_id_stmt="SELECT domains.id as domain_id FROM domains WHERE domains.name=%(domain)s" subdomains_list_stmt = "SELECT DISTINCT(urls.dns_name) AS domain_names "\ "FROM urls INNER JOIN (hosts_urls, hosts, vhosts_features, vhosts ) "\ "ON (urls.url_id = hosts_urls.url_id and urls.url_id = vhosts.url_id and vhosts.vhost_id = vhosts_features.vhost_id) "\ "WHERE (hosts_urls.http = 'Y' and hosts.hostname = %(webserver)s and "\ "urls.dns_name LIKE %(domain)s)" default_conf_file="./etc/ot_certs.ini" logging.basicConfig(level=logging.INFO) logger = logging.getLogger() def init_prog(argv): """ Parse command line args and config file """ parser = argparse.ArgumentParser( description="Manage LetsEncrypt certificates") parser.add_argument("-c", "--config", type=open, required=False, default=default_conf_file, help="Specifity config file (default: {})".format(default_conf_file)) parser.add_argument("--renew", default=False, action='store_true', required=False, help="Invoca solamente il renew per i certificati gia' presenti") service_group = parser.add_mutually_exclusive_group(required=True) service_group.add_argument("--liste", default=False, action='store_true', required=False, help="Richiedi i certificati per liste.indivia.net") service_group.add_argument("--hosting", default=False, action='store_true', required=False, help="Richiedi i certificati per i siti in hosting") service_group.add_argument("--webmail", default=False, action='store_true', required=False, help="Richiedi i certificati per le webmail") service_group.add_argument("--smtp", default=False, action='store_true', required=False, help="Richiedi i certificati per il server SMTP") service_group.add_argument("--mbox", default=False, action='store_true', required=False, help="Richiedi i certificati per il server POP/IMAP") args = parser.parse_args() try: config = configparser.ConfigParser() config.read_file(args.config) except Exception as e: logger.error("Error parsing configuration {}".format(e)) exit(-1) return args, config def connect_db(conf_dict): try: cnx = mysql.connector.connect(**conf_dict) except mysql.connector.Error as err: if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: logger.error("Something is wrong with your user name or password") elif err.errno == mysql.connector.errorcode.ER_BAD_DB_ERROR: logger.error("Database does not exist") else: logger.error(err) return None return cnx def get_subdomain_list(config, domain, ot_conn, ex_subdomains=tuple()): """ Return a Python list containing subdomain of domain parameter eg: ['app.arkiwi.org'] """ result_dict=dict() ot_cursor=ot_conn.cursor() ot_cursor.execute(subdomains_list_stmt, {'webserver':config['main']['webserver'].strip(" '\""), 'domain':"%{}%".format(domain)}) subdomains_res = ot_cursor.fetchall() ot_cursor.close() try: subdomains_filtered = [s[0].decode('utf-8') for s in subdomains_res if not(s[0].decode('utf-8').startswith(ex_subdomains))] except AttributeError: subdomains_filtered = [s[0] for s in subdomains_res if not(s[0].startswith(ex_subdomains))] return subdomains_filtered def get_domain_list(config, ot_conn, dns_conn): """ Return a dictionary of domains and properties eg:{'indivia.net': {manage_ns=True, domain_id=92}, 'metalabs.org': {managed_ns=False}} """ result_dict=dict() ot_cursor=ot_conn.cursor() ot_cursor.execute(domains_list_stmt, {'webserver':config['main']['webserver'].strip(" '\"")}) dns_cursor=dns_conn.cursor() for domain_barr, in ot_cursor: try: domain_name = domain_barr.decode("utf-8") except AttributeError: domain_name = domain_barr; logger.debug(domain_barr) try: dns_cursor.execute(domain_id_stmt, {'domain':domain_name}) except Exception as e: logger.error(e) exit(-1) dns_res = dns_cursor.fetchall() if dns_cursor.rowcount == 1 : result_dict.update({domain_name: {'managed_ns':True, 'domain_id':dns_res[0][0]}}) elif dns_cursor.rowcount == 0: result_dict.update({domain_name: {'managed_ns':False, 'domain_id':None}}) else: logger.error('Unexpected result for domain {}'.format(domain_name)) dns_cursor.close() ot_cursor.close() return result_dict def get_alias_list(config, dns_conn, query, aliases): """ Return a list of domains to get the certificate for """ result_list = list() dns_cursor=dns_conn.cursor() try: dns_cursor.execute(query, aliases) except Exception as e: logger.error(e) exit(-1) dns_res = dns_cursor.fetchall() result_list = [name[0].decode('utf-8') for name in dns_res] dns_cursor.close() return result_list def acme_renew(config, pre_hook_cmd, post_hook_cmd, dryrun=False): args = config['certbot']['base_args'] # args += " -m {} ".format(config['certbot']['email']) # args += "--server {} ".format(config['certbot']['server']) if dryrun: args += "--dry-run " if not pre_hook_cmd is None: args +=' --pre-hook "{}"'.format(pre_hook_cmd) if not post_hook_cmd is None: args +=' --post-hook "{}"'.format(post_hook_cmd) args += " renew" if dryrun: logging.info("{} {}".format(config['certbot']['bin'], args)) else: os.system("{} {}".format(config['certbot']['bin'], args)) return True def acme_request(config, domain_name, acme_test='DNS-01', webroot=None, dryrun=False, domains_list=None): args = config['certbot']['base_args'] args += " -m {} ".format(config['certbot']['email']) args += "--server {} ".format(config['certbot']['server']) if dryrun: args += "--dry-run " if acme_test == 'DNS-01': args += " --manual certonly " args += "--preferred-challenges dns-01 " args += "--manual-auth-hook {} ".format(config['certbot']['auth_hook']) args += "--manual-cleanup-hook {} ".format(config['certbot']['cleanup_hook']) args += "-d {},*.{}".format(domain_name, domain_name) elif acme_test == 'HTTP-01': args += " --webroot certonly " args += "--preferred-challenges http-01 " if webroot is None: args += "-w {}/{}/htdocs ".format(config['apache']['webroot'], domain_name) else: args += "-w {} ".format(webroot) if domains_list is None: args += "-d {}".format(domain_name) else: args += "--expand --cert-name {} ".format(domain_name) args += "-d {}".format(",".join(domains_list)) else: logger.error('acme test {} not supported'.format(acme_test)) return False if dryrun: logging.info("{} {}".format(config['certbot']['bin'], args)) else: os.system("{} {}".format(config['certbot']['bin'], args)) return True def symlink_force(target, link_name): try: os.symlink(target, link_name) except Exception as e: if e.errno == errno.EEXIST: os.remove(link_name) os.symlink(target, link_name) else: raise e def link_cert(config, source, dest, dryrun=False): src_name = os.path.join(config['certbot']['live_certificates_dir'], source) link_name = os.path.join(config['apache']['certificates_root'], dest) if dryrun: logger.info('{} -> {}'.format(link_name,src_name)) return else: symlink_force(src_name, link_name) if __name__ == '__main__': args, config = init_prog(sys.argv) dryrun=config['main'].getboolean('dryrun') service_reload = dict() if dryrun: print("DRYRUN, nessun certificato verra' richiesto, nessun link/file creato o modificato") if args.renew: pre_hook_cmd = None post_hook_cmd = None logging.info('Renewing certificates ') if args.webmail or args.hosting or args.liste: post_hook_cmd = "systemctl reload apache2" elif args.smtp: post_hook_cmd = "systemctl reload postfix" elif args.mbox: post_hook_cmd = "systemctl restart dovecot" logger.debug("post_hook_cmd: {}".format(post_hook_cmd)) if acme_renew(config, pre_hook_cmd, post_hook_cmd, dryrun=dryrun): logger.info("Done renew") else: # Fai le nuove richieste per i certificati # Caso speciale per le webmail if args.webmail: logging.info('Asking certificates for webmail') vhost_name = config['webmail']['vhost'].strip() webmails_list = ["webmail.{}".format(d.strip()) for d in config['webmail']['domains'].split(',') if len(d.strip())>0] logging.info('vhost {}, domains_list {}'.format(vhost_name, webmails_list)) if acme_request(config, vhost_name, acme_test='HTTP-01', dryrun=dryrun, domains_list=webmails_list): link_cert(config, vhost_name, vhost_name, dryrun=dryrun) else: logger.error('Error asking certificate for {}'.format(vhost_name)) # reload apache logger.info("Reloading apache") # ret = subprocess.run("systemctl reload apache2") ret = os.system("systemctl reload apache2") logger.info(ret) # Caso speciale per l'hosting if args.hosting: logging.info('Asking certificates for hosted web domains') ot_conn=connect_db(dict(config['ot_db'])) dns_conn=connect_db(dict(config['dns_db'])) # Subdomains da escludere ex_subdomains = tuple([s.strip() for s in config['main']['special_subdomains'].split(',') if len(s.strip())>0]) domains_dict = get_domain_list(config, ot_conn, dns_conn) logger.debug(domains_dict) for domain_name, domain_feat in domains_dict.items(): domain_feat['subdomains']=get_subdomain_list(config, domain_name, ot_conn, ex_subdomains=ex_subdomains) # Controlla se i nameserver sono gestiti da noi if domain_feat['managed_ns']: # Nel caso il nameserver sia gestito, chiedi certificati per il dominio e la wildcard logger.info('Get certificates for {}, *.{}'.format(domain_name, domain_name)) if acme_request(config, domain_name, acme_test='DNS-01', dryrun=dryrun): link_cert(config, domain_name, domain_name, dryrun=dryrun) # Crea il link per ogni subdomain for subdomain in domain_feat['subdomains']: link_cert(config, domain_name, subdomain, dryrun=dryrun) else: # Nel caso i nameserver NON siano gestiti, allora chiedi un certificato per ogni sottodominio # Crea il link per ogni subdomain for subdomain in domain_feat['subdomains']: logger.info('Get certificates for {}'.format(subdomain)) if acme_request(config, subdomain, acme_test='HTTP-01', dryrun=dryrun): link_cert(config, subdomain, subdomain, dryrun=dryrun) ot_conn.close() dns_conn.close() # reload apache logger.info("Reloading apache") # ret = subprocess.run("systemctl reload apache2") ret = os.system("systemctl reload apache2") logger.info(ret) # Caso speciale per l'interfaccia di mailman if args.liste: logging.info('Asking certificates for liste.indivia.net') vhost_name = config['mailman']['vhost'].strip() liste_list = ["liste.{}".format(d.strip()) for d in config['mailman']['domains'].split(',') if len(d.strip())>0] if acme_request(config, vhost_name, acme_test='HTTP-01', dryrun=dryrun, domains_list=liste_list): link_cert(config, vhost_name, vhost_name, dryrun=dryrun) else: logger.error('Error asking certificate for {}'.format(vhost_name)) # reload apache logger.info("Reloading apache") # ret = subprocess.run("systemctl reload apache2") ret = os.system("systemctl reload apache2") logger.info(ret) # Caso speciale per il server POP/IMAP if args.mbox: dns_conn=connect_db(dict(config['dns_db'])) logging.info('Asking certificates for POP/IMAP server') vhost_name = config['mail']['mbox_vhost'].strip() server_addresses = [s.strip() for s in config['mail']['mbox_server_addresses'].split(',') if len(s.strip())>0] mbox_fmt = ','.join(['%s'] * len(server_addresses)) mbox_query = mbox_list_stmt.format(mbox_fmt) alias_list = get_alias_list(config, dns_conn, mbox_query, server_addresses) # Per usi futuri, aggiungo l'alias 'mail.indivia.net' alias_list.append('mail.indivia.net') logging.info('vhost {}, domains_list {}'.format(vhost_name, alias_list)) if acme_request(config, vhost_name, acme_test='HTTP-01', webroot=config['mail']['mbox_webroot'].strip(), dryrun=dryrun, domains_list=alias_list): # non e' richiesto il link, punto direttamente le configurazioni alle dir di letsencrypt # link_cert(config, vhost_name, vhost_name, dryrun=dryrun) service_reload['mbox'] = True pass else: logger.error('Error asking certificate for {}'.format(vhost_name)) dns_conn.close() # restart dovecot logger.info("Restarting dovecot") # ret = subprocess.run("systemctl restart dovecot") ret = os.system("systemctl restart dovecot") logger.info(ret) # Caso speciale per il server SMTP if args.smtp: logging.info('Asking certificates for SMTP server') dns_conn=connect_db(dict(config['dns_db'])) vhost_name = config['mail']['smtp_vhost'].strip() server_addresses = [s.strip() for s in config['mail']['smtp_server_addresses'].split(',') if len(s.strip())>0] smtp_fmt = ','.join(['%s'] * len(server_addresses)) smtp_query = smtp_list_stmt.format(smtp_fmt) alias_list = get_alias_list(config, dns_conn, smtp_query, server_addresses) logging.info('vhost {}, domains_list {}'.format(vhost_name, alias_list)) if acme_request(config, vhost_name, acme_test='HTTP-01', webroot=config['mail']['smtp_webroot'].strip(), dryrun=dryrun, domains_list=alias_list): # non e' richiesto il link, punto direttamente le configurazioni alle dir di letsencrypt # link_cert(config, vhost_name, vhost_name, dryrun=dryrun) service_reload['smtp'] = True pass else: logger.error('Error asking certificate for {}'.format(vhost_name)) dns_conn.close() # reload postfix logger.info("Restarting postfix") # ret = subprocess.run("systemctl reload postfix") ret = os.system("systemctl reload postfix") logger.info(ret)