import os import sys import errno import argparse import configparser import logging import mysql.connector import subprocess from pwd import getpwnam from grp import getgrnam # Query for IMAP/POP3 certificate mbox_list_stmt = "SELECT DISTINCT(name) FROM records WHERE content in ({}) and (name LIKE 'imap.%' or name LIKE 'pop3.%' or name LIKE 'mail.%')" # Query for SMTP certificate smtp_list_stmt = "SELECT DISTINCT(name) FROM records WHERE content in ({}) and (name LIKE 'smtp.%' or name LIKE 'mail.%')" # Get list of defined domains in vhosts configuration database domains_list_stmt = """SELECT DISTINCT(SUBSTRING_INDEX(urls.dns_name, '.', -2)) AS domain_names FROM hosts INNER JOIN (hosts_urls, urls, vhosts_features, vhosts) ON (urls.url_id = hosts_urls.url_id and urls.url_id = vhosts.url_id and vhosts.vhost_id = vhosts_features.vhost_id and hosts.host_id = hosts_urls.host_id) WHERE (hosts_urls.http = 'Y' and hosts.hostname = %(webserver)s) """ # Get list of defined urls for specific webserver urls_list_stmt = """SELECT DISTINCT(urls.dns_name) AS urls FROM hosts INNER JOIN (hosts_urls, urls, vhosts_features, vhosts) ON (urls.url_id = hosts_urls.url_id and urls.url_id = vhosts.url_id and vhosts.vhost_id = vhosts_features.vhost_id and hosts.host_id = hosts_urls.host_id) WHERE (hosts_urls.http = 'Y' and hosts.hostname = %(webserver)s) """ # Get domain_id if defined in nameserver database domain_id_stmt="SELECT domains.id as domain_id FROM domains WHERE domains.name=%(domain)s" subdomains_list_stmt = "SELECT DISTINCT(urls.dns_name) AS domain_names "\ "FROM urls INNER JOIN (hosts_urls, hosts, vhosts_features, vhosts ) "\ "ON (urls.url_id = hosts_urls.url_id and urls.url_id = vhosts.url_id and vhosts.vhost_id = vhosts_features.vhost_id) "\ "WHERE (hosts_urls.http = 'Y' and hosts.hostname = %(webserver)s and "\ "urls.dns_name LIKE %(domain)s)" default_conf_file="./etc/ot_certs.ini" logging.basicConfig(level=logging.INFO) logger = logging.getLogger() def init_prog(argv): """ Parse command line args and config file """ parser = argparse.ArgumentParser( description="Manage LetsEncrypt certificates") parser.add_argument("-c", "--config", type=open, required=False, default=default_conf_file, help="Specifity config file (default: {})".format(default_conf_file)) parser.add_argument("--renew", default=False, action='store_true', required=False, help="Invoca solamente il renew per i certificati gia' presenti") service_group = parser.add_mutually_exclusive_group(required=True) service_group.add_argument("--proxy", default=False, action='store_true', required=False, help="Richiedi i certificati per i siti proxaty") service_group.add_argument("--liste", default=False, action='store_true', required=False, help="Richiedi i certificati per liste.indivia.net") service_group.add_argument("--hosting", default=False, action='store_true', required=False, help="Richiedi i certificati per i siti in hosting") service_group.add_argument("--webmail", default=False, action='store_true', required=False, help="Richiedi i certificati per le webmail") service_group.add_argument("--smtp", default=False, action='store_true', required=False, help="Richiedi i certificati per il server SMTP") service_group.add_argument("--mbox", default=False, action='store_true', required=False, help="Richiedi i certificati per il server POP/IMAP") args = parser.parse_args() try: config = configparser.ConfigParser() config.read_file(args.config) except Exception as e: logger.error("Error parsing configuration {}".format(e)) exit(-1) return args, config def connect_db(conf_dict): try: cnx = mysql.connector.connect(**conf_dict) except mysql.connector.Error as err: if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: logger.error("Something is wrong with your user name or password") elif err.errno == mysql.connector.errorcode.ER_BAD_DB_ERROR: logger.error("Database does not exist") else: logger.error(err) return None return cnx def get_subdomain_list(config, domain, ot_conn, ex_subdomains=tuple()): """ Return a Python list containing subdomain of domain parameter eg: ['app.arkiwi.org'] """ result_dict=dict() ot_cursor=ot_conn.cursor() ot_cursor.execute(subdomains_list_stmt, {'webserver':config['main']['webserver'].strip(" '\""), 'domain':"%{}%".format(domain)}) subdomains_res = ot_cursor.fetchall() ot_cursor.close() try: subdomains_filtered = [s[0].decode('utf-8') for s in subdomains_res if not(s[0].decode('utf-8').startswith(ex_subdomains))] except AttributeError: subdomains_filtered = [s[0] for s in subdomains_res if not(s[0].startswith(ex_subdomains))] return subdomains_filtered def get_domain_list(config, ot_conn, dns_conn): """ Return a dictionary of domains and properties eg:{'indivia.net': {manage_ns=True, domain_id=92}, 'metalabs.org': {managed_ns=False}} """ result_dict=dict() ot_cursor=ot_conn.cursor() ot_cursor.execute(domains_list_stmt, {'webserver':config['main']['webserver'].strip(" '\"")}) dns_cursor=dns_conn.cursor() for domain_barr, in ot_cursor: try: domain_name = domain_barr.decode("utf-8") except AttributeError: domain_name = domain_barr; logger.debug(domain_barr) try: dns_cursor.execute(domain_id_stmt, {'domain':domain_name}) except Exception as e: logger.error(e) exit(-1) dns_res = dns_cursor.fetchall() if dns_cursor.rowcount == 1 : result_dict.update({domain_name: {'managed_ns':True, 'domain_id':dns_res[0][0]}}) elif dns_cursor.rowcount == 0: result_dict.update({domain_name: {'managed_ns':False, 'domain_id':None}}) else: logger.error('Unexpected result for domain {}'.format(domain_name)) dns_cursor.close() ot_cursor.close() return result_dict def get_url_list(config_section, server_name, ot_conn, dns_conn): """ Ritorna la lista delle url configurate per uno specifico server_name NB: il questo momento il dato viene estratto dal db di ortiche, ma non viene controllato se il dns e' configurato in maniera coerente. Questo potrebbe generare errori in momenti successivi (es, durante il challenge HTTP-01) """ urls_list = [] ot_cursor=ot_conn.cursor() ot_cursor.execute(urls_list_stmt, {'webserver':server_name}) ot_res = ot_cursor.fetchall() logger.debug(ot_res) urls_list = [t[0] for t in ot_res] ot_cursor.close() return urls_list def get_alias_list(config, dns_conn, query, aliases): """ Return a list of domains to get the certificate for """ result_list = list() dns_cursor=dns_conn.cursor() try: dns_cursor.execute(query, aliases) except Exception as e: logger.error(e) exit(-1) dns_res = dns_cursor.fetchall() result_list = [name[0].decode('utf-8') for name in dns_res] dns_cursor.close() return result_list def acme_renew(config, pre_hook_cmd, post_hook_cmd, dryrun=False): args = config['certbot']['base_args'] # args += " -m {} ".format(config['certbot']['email']) # args += "--server {} ".format(config['certbot']['server']) if dryrun: args += "--dry-run " if not pre_hook_cmd is None: args +=' --pre-hook "{}"'.format(pre_hook_cmd) if not post_hook_cmd is None: args +=' --post-hook "{}"'.format(post_hook_cmd) args += " renew" if dryrun: logging.info("{} {}".format(config['certbot']['bin'], args)) else: os.system("{} {}".format(config['certbot']['bin'], args)) return True def acme_request(config, domain_name, acme_test='DNS-01', webroot=None, dryrun=False, domains_list=None): args = config['certbot']['base_args'] args += " -m {} ".format(config['certbot']['email']) args += "--server {} ".format(config['certbot']['server']) if dryrun: args += "--dry-run " if acme_test == 'DNS-01': args += " --manual certonly " args += "--preferred-challenges dns-01 " args += "--manual-auth-hook {} ".format(config['certbot']['auth_hook']) args += "--manual-cleanup-hook {} ".format(config['certbot']['cleanup_hook']) args += "-d {},*.{}".format(domain_name, domain_name) elif acme_test == 'HTTP-01': args += " --webroot certonly " args += "--preferred-challenges http-01 " if webroot is None: args += "-w {}/{}/htdocs ".format(config['apache']['webroot'], domain_name) else: args += "-w {} ".format(webroot) if domains_list is None: args += "-d {}".format(domain_name) else: args += "--expand --cert-name {} ".format(domain_name) args += "-d {}".format(",".join(domains_list)) else: logger.error('acme test {} not supported'.format(acme_test)) return False if dryrun: logging.info("{} {}".format(config['certbot']['bin'], args)) else: os.system("{} {}".format(config['certbot']['bin'], args)) return True def symlink_force(target, link_name): try: os.symlink(target, link_name) except Exception as e: if e.errno == errno.EEXIST: os.remove(link_name) os.symlink(target, link_name) else: raise e def link_cert(config, source, dest, dryrun=False): src_name = os.path.join(config['certbot']['live_certificates_dir'], source) link_name = os.path.join(config['apache']['certificates_root'], dest) if dryrun: logger.info('{} -> {}'.format(link_name,src_name)) return else: symlink_force(src_name, link_name) def fix_permissions(config): """ Sistema i permessi dei certificati affinche' risultino leggibili dai demoni interessati """ archive_dir = config['certbot']['archive_certificates_dir'] uid = getpwnam(config['certbot']['certificates_user'])[2] gid = getgrnam(config['certbot']['certificates_group'])[2] for root, dirs, files in os.walk(archive_dir): for momo in dirs: logger.debug('Fixing user/group and permissions on {}'.format(os.path.join(root, momo))) os.chown(os.path.join(root, momo), uid, gid) os.chmod(os.path.join(root, momo), 0o755) for momo in files: logger.debug('Fixing user/group and permissions on {}'.format(os.path.join(root, momo))) os.chown(os.path.join(root, momo), uid, gid) if momo.startswith('privkey'): os.chmod(os.path.join(root, momo), 0o640) else: os.chmod(os.path.join(root, momo), 0o644) if __name__ == '__main__': args, config = init_prog(sys.argv) dryrun=config['main'].getboolean('dryrun') service_reload = dict() if dryrun: print("DRYRUN, nessun certificato verra' richiesto, nessun link/file creato o modificato") if args.renew: pre_hook_cmd = None post_hook_cmd = None logging.info('Renewing certificates ') if args.webmail or args.hosting or args.liste: post_hook_cmd = "systemctl reload apache2" elif args.smtp: post_hook_cmd = "systemctl reload postfix" elif args.mbox: post_hook_cmd = "systemctl restart dovecot" elif args.proxy: post_hook_cmd = "systemctl reload nginx" logger.debug("post_hook_cmd: {}".format(post_hook_cmd)) if acme_renew(config, pre_hook_cmd, post_hook_cmd, dryrun=dryrun): logger.info("Done renew") else: # Fai le nuove richieste per i certificati # Caso speciale per le webmail if args.webmail: logging.info('Asking certificates for webmail') vhost_name = config['webmail']['vhost'].strip() webmails_list = ["webmail.{}".format(d.strip()) for d in config['webmail']['domains'].split(',') if len(d.strip())>0] logging.info('vhost {}, domains_list {}'.format(vhost_name, webmails_list)) if acme_request(config, vhost_name, acme_test='HTTP-01', dryrun=dryrun, domains_list=webmails_list): link_cert(config, vhost_name, vhost_name, dryrun=dryrun) else: logger.error('Error asking certificate for {}'.format(vhost_name)) # reload apache logger.info("Reloading apache") # ret = subprocess.run("systemctl reload apache2") ret = os.system("systemctl reload apache2") logger.info(ret) # Caso speciale per il proxy if args.proxy: logging.info('Asking certificates for proxy web domains') try: proxy_conf = config['nginx'] except KeyError as e: logger.error("Error parsing configuration, KeyError {}".format(e)) exit(-1) ot_conn=connect_db(dict(config['ot_db'])) upstream_servers = [s.strip() for s in proxy_conf['upstream_servers'].split(',') if len(s.strip())>0] for server_name in upstream_servers: logger.debug("Upstream server {}".format(server_name)) url_list = get_url_list(proxy_conf, server_name, ot_conn, None) logger.debug(url_list) for url in url_list: acme_request(config, url, acme_test='HTTP-01', webroot=proxy_conf['http-01_webroot'], dryrun=dryrun, domains_list=[url]) ot_conn.close() if not dryrun: fix_permissions(config) logger.info("Reloading nginx") ret = os.system("systemctl reload nginx") logger.info(ret) # Caso speciale per l'hosting if args.hosting: logging.info('Asking certificates for hosted web domains') try: hosting_conf = config['apache'] except KeyError as e: logger.error("Error parsing configuration, KeyError {}".format(e)) exit(-1) ot_conn=connect_db(dict(config['ot_db'])) dns_conn=connect_db(dict(config['dns_db'])) # Subdomains da escludere ex_subdomains = tuple([s.strip() for s in config['main']['special_subdomains'].split(',') if len(s.strip())>0]) domains_dict = get_domain_list(config, ot_conn, dns_conn) logger.debug(domains_dict) for domain_name, domain_feat in domains_dict.items(): domain_feat['subdomains']=get_subdomain_list(config, domain_name, ot_conn, ex_subdomains=ex_subdomains) # Controlla se i nameserver sono gestiti da noi if domain_feat['managed_ns']: # Nel caso il nameserver sia gestito, chiedi certificati per il dominio e la wildcard logger.info('Get certificates for {}, *.{}'.format(domain_name, domain_name)) if acme_request(config, domain_name, acme_test='DNS-01', dryrun=dryrun): link_cert(config, domain_name, domain_name, dryrun=dryrun) # Crea il link per ogni subdomain for subdomain in domain_feat['subdomains']: link_cert(config, domain_name, subdomain, dryrun=dryrun) else: # Nel caso i nameserver NON siano gestiti, allora chiedi un certificato per ogni sottodominio # Crea il link per ogni subdomain for subdomain in domain_feat['subdomains']: logger.info('Get certificates for {}'.format(subdomain)) if acme_request(config, subdomain, acme_test='HTTP-01', dryrun=dryrun): link_cert(config, subdomain, subdomain, dryrun=dryrun) ot_conn.close() dns_conn.close() # reload apache logger.info("Reloading apache") # ret = subprocess.run("systemctl reload apache2") ret = os.system("systemctl reload apache2") logger.info(ret) # Caso speciale per l'interfaccia di mailman if args.liste: logging.info('Asking certificates for liste.indivia.net') vhost_name = config['mailman']['vhost'].strip() liste_list = ["liste.{}".format(d.strip()) for d in config['mailman']['domains'].split(',') if len(d.strip())>0] if acme_request(config, vhost_name, acme_test='HTTP-01', dryrun=dryrun, domains_list=liste_list): link_cert(config, vhost_name, vhost_name, dryrun=dryrun) else: logger.error('Error asking certificate for {}'.format(vhost_name)) # reload apache logger.info("Reloading apache") # ret = subprocess.run("systemctl reload apache2") ret = os.system("systemctl reload apache2") logger.info(ret) # Caso speciale per il server POP/IMAP if args.mbox: dns_conn=connect_db(dict(config['dns_db'])) logging.info('Asking certificates for POP/IMAP server') vhost_name = config['mail']['mbox_vhost'].strip() server_addresses = [s.strip() for s in config['mail']['mbox_server_addresses'].split(',') if len(s.strip())>0] mbox_fmt = ','.join(['%s'] * len(server_addresses)) mbox_query = mbox_list_stmt.format(mbox_fmt) alias_list = get_alias_list(config, dns_conn, mbox_query, server_addresses) # Per usi futuri, aggiungo l'alias 'mail.indivia.net' alias_list.append('mail.indivia.net') logging.info('vhost {}, domains_list {}'.format(vhost_name, alias_list)) if acme_request(config, vhost_name, acme_test='HTTP-01', webroot=config['mail']['mbox_webroot'].strip(), dryrun=dryrun, domains_list=alias_list): # non e' richiesto il link, punto direttamente le configurazioni alle dir di letsencrypt # link_cert(config, vhost_name, vhost_name, dryrun=dryrun) service_reload['mbox'] = True pass else: logger.error('Error asking certificate for {}'.format(vhost_name)) dns_conn.close() # restart dovecot logger.info("Restarting dovecot") # ret = subprocess.run("systemctl restart dovecot") ret = os.system("systemctl restart dovecot") logger.info(ret) # Caso speciale per il server SMTP if args.smtp: logging.info('Asking certificates for SMTP server') dns_conn=connect_db(dict(config['dns_db'])) vhost_name = config['mail']['smtp_vhost'].strip() server_addresses = [s.strip() for s in config['mail']['smtp_server_addresses'].split(',') if len(s.strip())>0] smtp_fmt = ','.join(['%s'] * len(server_addresses)) smtp_query = smtp_list_stmt.format(smtp_fmt) alias_list = get_alias_list(config, dns_conn, smtp_query, server_addresses) logging.info('vhost {}, domains_list {}'.format(vhost_name, alias_list)) if acme_request(config, vhost_name, acme_test='HTTP-01', webroot=config['mail']['smtp_webroot'].strip(), dryrun=dryrun, domains_list=alias_list): # non e' richiesto il link, punto direttamente le configurazioni alle dir di letsencrypt # link_cert(config, vhost_name, vhost_name, dryrun=dryrun) service_reload['smtp'] = True pass else: logger.error('Error asking certificate for {}'.format(vhost_name)) dns_conn.close() # reload postfix logger.info("Restarting postfix") # ret = subprocess.run("systemctl reload postfix") ret = os.system("systemctl reload postfix") logger.info(ret)