From cf94a720334f236833a0b8a3bd6fcd225c4b94c7 Mon Sep 17 00:00:00 2001 From: jigen Date: Sun, 10 Jun 2018 22:56:31 +0200 Subject: [PATCH] First version of Python port --- .gitignore | 3 + OTcerts.py | 217 +++++++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 220 insertions(+) create mode 100644 .gitignore create mode 100644 OTcerts.py diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..cbf8671 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +etc/ +#.*# +.*~ diff --git a/OTcerts.py b/OTcerts.py new file mode 100644 index 0000000..22326e2 --- /dev/null +++ b/OTcerts.py @@ -0,0 +1,217 @@ + +import os +import sys +import errno +import argparse +import configparser +import logging +import mysql.connector + +# Get list of defined domains in vhosts configuration database +domains_list_stmt = """SELECT DISTINCT(SUBSTRING_INDEX(urls.dns_name, '.', -2)) AS domain_names +FROM urls INNER JOIN (hosts_urls, hosts, vhosts_features, vhosts) +ON (urls.url_id = hosts_urls.url_id and urls.url_id = vhosts.url_id and vhosts.vhost_id = vhosts_features.vhost_id) +WHERE (hosts_urls.http = 'Y' and hosts.hostname = %(webserver)s) +""" + +# Get domain_id if defined in nameserver database +domain_id_stmt="SELECT domains.id as domain_id FROM domains WHERE domains.name=%(domain)s" + +subdomains_list_stmt = "SELECT DISTINCT(urls.dns_name) AS domain_names "\ + "FROM urls INNER JOIN (hosts_urls, hosts, vhosts_features, vhosts ) "\ + "ON (urls.url_id = hosts_urls.url_id and urls.url_id = vhosts.url_id and vhosts.vhost_id = vhosts_features.vhost_id) "\ + "WHERE (hosts_urls.http = 'Y' and hosts.hostname = %(webserver)s and "\ + "urls.dns_name LIKE %(domain)s)" + + + +default_conf_file="./etc/ot_certs.ini" +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger() + + +def init_prog(argv): + """ + Parse command line args and config file + """ + + parser = argparse.ArgumentParser( + description="Manage LetsEncrypt certificates") + parser.add_argument("-c", "--config", type=open, + required=False, + default=default_conf_file, + help="Specifity config file (default: {})".format(default_conf_file)) + args = parser.parse_args() + try: + config = configparser.ConfigParser() + config.read_file(args.config) + except Exception as e: + logger.error("Error parsing configuration {}".format(e)) + exit(-1) + return args, config + + + +def connect_db(conf_dict): + + try: + cnx = mysql.connector.connect(**conf_dict) + except mysql.connector.Error as err: + if err.errno == mysql.connector.errorcode.ER_ACCESS_DENIED_ERROR: + logger.error("Something is wrong with your user name or password") + elif err.errno == mysql.connector.errorcode.ER_BAD_DB_ERROR: + logger.error("Database does not exist") + else: + logger.error(err) + return None + return cnx + + +def get_subdomain_list(config, domain, ot_conn, ex_subdomains=tuple()): + """ + Return a Python list containing subdomain of domain paramer + eg: ['app.arkiwi.org'] + """ + result_dict=dict() + + + ot_cursor=ot_conn.cursor() + ot_cursor.execute(subdomains_list_stmt, {'webserver':config['main']['webserver'].strip(" '\""), + 'domain':"%{}%".format(domain)}) + + subdomains_res = ot_cursor.fetchall() + ot_cursor.close() + + #if filtered: + subdomains_filtered = [s[0].decode('utf-8') for s in subdomains_res + if not(s[0].decode('utf-8').startswith(ex_subdomains))] + # else: + # subdomains_filtered = [s[0].decode('utf-8') for s in subdomains_res] + + return subdomains_filtered + + +def get_domain_list(config, ot_conn, dns_conn): + """ + Return a dictionary of domains and properties + eg:{'indivia.net': {manage_ns=True, domain_id=92}, + 'metalabs.org': {managed_ns=False}} + """ + result_dict=dict() + + ot_cursor=ot_conn.cursor() + ot_cursor.execute(domains_list_stmt, {'webserver':config['main']['webserver'].strip(" '\"")}) + + dns_cursor=dns_conn.cursor() + for domain_barr, in ot_cursor: + domain_name = domain_barr.decode("utf-8") + try: + dns_cursor.execute(domain_id_stmt, {'domain':domain_name}) + except Exception as e: + logger.error(e) + exit(-1) + dns_res = dns_cursor.fetchall() + if dns_cursor.rowcount == 1 : + result_dict.update({domain_name: {'managed_ns':True, 'domain_id':dns_res[0][0]}}) + elif dns_cursor.rowcount == 0: + result_dict.update({domain_name: {'managed_ns':False, 'domain_id':None}}) + else: + logger.error('Unexpected result for domain {}'.format(domain_name)) + + dns_cursor.close() + ot_cursor.close() + return result_dict + + +def acme_request(config, domain_name, acme_test='DNS-01', dryrun=False, domains_list=None): + + args = config['certbot']['base_args'] + if dryrun: + args += "--dry-run " + args += "-m {} ".format(config['certbot']['email']) + args += "--server {} ".format(config['certbot']['server']) + + if acme_test == 'DNS-01': + args += "--manual certonly " + args += "--preferred-challenges dns-01 " + args += "--manuale-auth-hook {} ".format(config['certbot']['auth_hook']) + args += "--manuale-cleanup-hook {} ".format(config['certbot']['cleanup_hook']) + args += "-d {},*.{}".format(domain_name, domain_name) + elif acme_test == 'HTTP-01': + args += "--webroot certonly " + args += "--preferred-challenges http-01 " + args += "-w {}/{}/htdocs ".format(config['apache']['webroot'], domain_name) + if domains_list is None: + args += "-d {}".format(domain_name) + else: + args += "--expand --cert-name {} ".format(domain_name) + args += "-d {}".format(",".join(domains_list)) + else: + logger.error('acme test {} not supported'.format(acme_test)) + return False + if dryrun: + logging.info("{} {}".format(config['certbot']['bin'], args)) + else: + os.system("{} {}".format(config['certbot']['bin'], args)) + + return True + +def symlink_force(target, link_name): + try: + os.symlink(target, link_name) + except e: + if e.errno == errno.EEXIST: + os.remove(link_name) + os.symlink(target, link_name) + else: + raise e + +def link_cert(config, source, dest, dryrun=False): + src_name = os.path.join(config['certbot']['live_certificates_dir'], source) + link_name = os.path.join(config['apache']['certificates_root'], dest) + if dryrun: + logger.info('{} -> {}'.format(link_name,src_name)) + return + else: + symlink_force(src_name, link_name) + + +if __name__ == '__main__': + args, config = init_prog(sys.argv) + + dryrun=config['main'].getboolean('dryrun') + + ot_conn=connect_db(dict(config['ot_db'])) + dns_conn=connect_db(dict(config['dns_db'])) + + # Caso speciale per le webmail + vhost_name = config['webmail']['vhost'].strip() + webmails_list = ["webmail.{}".format(d.strip()) for d in config['webmail']['domains'].split(',')] + if acme_request(config, vhost_name, acme_test='HTTP-01', dryrun=dryrun, domains_list=webmails_list): + link_cert(config, vhost_name, vhost_name, dryrun=dryrun) + else: + logger.error('Error asking certificate for {}'.format(vhost_name)) + + # Subdomains da escludere + ex_subdomains = tuple([s.strip() for s in config['main']['special_subdomains'].split(',') if len(s.strip())>0]) + domains_dict = get_domain_list(config, ot_conn, dns_conn) + + for domain_name, domain_feat in domains_dict.items(): + domain_feat['subdomains']=get_subdomain_list(config, domain_name, ot_conn, ex_subdomains=ex_subdomains) + # Controlla se i nameserver sono gestiti da noi + if domain_feat['managed_ns']: + # Nel caso il nameserver sia gestito, chiedi certificati per il dominio e la wildcard + if acme_request(config, domain_name, acme_test='DNS-01', dryrun=dryrun): + link_cert(config, domain_name, domain_name, dryrun=dryrun) + # Crea il link per ogni subdomain + for subdomain in domain_feat['subdomains']: + link_cert(config, domain_name, subdomain, dryrun=dryrun) + else: + # Nel caso i nameserver NON siano gestiti, allora chiedi un certificato per ogni sottodominio + # Crea il link per ogni subdomain + for subdomain in domain_feat['subdomains']: + if acme_request(config, subdomain, acme_test='HTTP-01', dryrun=dryrun): + link_cert(config, subdomain, subdomain, dryrun=dryrun) + + ot_conn.close() + dns_conn.close()