Compare commits

...

1 commit

Author SHA1 Message Date
75c3828c60 Refactor in a package 2018-07-29 23:41:40 +02:00
6 changed files with 191 additions and 118 deletions

7
certo/__init__.py Normal file
View file

@ -0,0 +1,7 @@
# -*- coding: utf-8 -*-
"""Top-level package for Certo."""
__author__ = """Blallo"""
__email__ = 'blallo@autistici.org'
__version__ = '0.4.0'

120
certo/certo.py Normal file
View file

@ -0,0 +1,120 @@
# -*- coding: utf-8 -*-
"""Main module."""
import hashlib
import logging
import ssl
import socket
import sys
import time
from . import colorlog
class Certo():
"""
Main class. It expect two parameters to configure the logger:
:param debugging: toggle the output of debugging messages.
:param colored: toggle the colored log format
"""
def __init__(self, debugging, colored):
self.debugging = debugging
self.colored = colored
if debugging:
self.log_level = logging.DEBUG
else:
self.log_level = logging.INFO
if colored:
self.formatter = colorlog.ColorFormatter("%(message)s")
else:
self.formatter = logging.Formatter("[%(levelname)4s] %(message)s")
self.logger = logging.getLogger(__name__)
self.logger.setLevel(self.log_level)
self._console_handler = logging.StreamHandler(sys.stdout)
self._console_handler.setLevel(self.log_level)
self._console_handler.setFormatter(self.formatter)
self.logger.addHandler(self._console_handler)
self.logger.debug("Formatter %r" % self.formatter)
# The following inspired by:
# https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data
# https://www.binarytides.com/receive-full-data-with-the-recv-socket-function-in-python/
def recv_msg(self, sock, timeout):
# Read message length and unpack it into an integer
data = b''
begin = time.time()
n = 1024
while len(data) < n or time.time() - begin < timeout:
try:
data += sock.recv(n - len(data))
self.logger.debug("Partial: %r" % data)
except socket.timeout:
pass
if b'Ready to start TLS\r\n' in data:
break
time.sleep(0.1)
if len(data) == 0:
return None
self.logger.debug("Data: %r" % data)
return data
def establish_conn(self, addr, port, starttls):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
if starttls:
self.logger.debug("Using STARTTLS")
self.logger.debug("Connecting to %s:%s" % (addr, port))
sock.connect((addr, port))
sock.send(b"STARTTLS\r\n")
data = self.recv_msg(sock, 5)
if data is None:
raise socket.error
wrapped_socket = ssl.wrap_socket(sock)
else:
wrapped_socket = ssl.wrap_socket(sock)
wrapped_socket.connect((addr, port))
return wrapped_socket.getpeercert(True)
except socket.timeout:
self.logger.error("Timeout trying to connect to %s:%s" % (addr, port))
sys.exit(2)
except socket.gaierror:
self.logger.error("Failure in name resolution: %s" % addr)
sys.exit(3)
wrapped_socket.close()
def get_cert(self, addr, port, starttls):
cert = self.establish_conn(addr, port, starttls)
pem_cert = ssl.DER_cert_to_PEM_cert(cert)
self.logger.debug("The certificate is:\n%s" % pem_cert)
return cert
def capitalize_and_colons(self, in_hash):
in_hash = in_hash.upper()
new_hash = in_hash[0:2]
for i in range(2, len(in_hash), 2):
new_hash += ":" + in_hash[i:i+2]
return new_hash
def compute_fingerprints(self, cert, with_colons):
thumb_md5 = hashlib.md5(cert).hexdigest()
thumb_sha1 = hashlib.sha1(cert).hexdigest()
thumb_sha256 = hashlib.sha256(cert).hexdigest()
self.logger.info("MD5: " + thumb_md5)
if with_colons:
self.logger.info(" " + self.capitalize_and_colons(thumb_md5))
self.logger.info("SHA1: " + thumb_sha1)
if with_colons:
self.logger.info(" " + self.capitalize_and_colons(thumb_sha1))
self.logger.info("SHA256: " + thumb_sha256)
if with_colons:
self.logger.info(" " + self.capitalize_and_colons(thumb_sha256))

31
certo/cli.py Normal file
View file

@ -0,0 +1,31 @@
# -*- coding: utf-8 -*-
"""Console script for certo."""
import ssl
import click
from . import certo
@click.command()
@click.argument('address')#, help="address to be used to retrieve the certificate")
@click.option('-p', '--port', default=443, type=click.IntRange(1,65535), help="The port to connect to.")
@click.option('--starttls', is_flag=True, flag_value=True, help="Whether to use starttls on connection.")
@click.option('--debug/--nodebug', is_flag=True, flag_value=False, help="Debug output.")
@click.option('-o', '--output', help="Path to save the certificate to.")
@click.option('--colons/--nocolons', is_flag=True, flag_value=False, help="Whether to output also hashed with colons.")
@click.option('--colors/--nocolors', is_flag=True, flag_value=True, help="Toggles the colored output.")
def do_it(address, port, starttls, debug, output, colons, colors):
grabber = certo.Certo(debug, colors)
cert = grabber.get_cert(address, port, starttls)
if output:
with open(output, 'w') as f:
grabber.logger.debug("Opening file %s" % output)
f.write(ssl.DER_cert_to_PEM_cert(cert))
grabber.logger.info("The certificate has been saved to %s" % output)
grabber.compute_fingerprints(cert, colons)
if __name__ == '__main__':
do_it()

27
certo/colorlog.py Normal file
View file

@ -0,0 +1,27 @@
# -*- encoding: utf-8 -*-
"""
Module to produce colored log output.
"""
# Taken from http://uran198.github.io/en/python/2016/07/12/colorful-python-logging.html
import logging
import colorama
import copy
LOG_COLORS = {
logging.ERROR: colorama.Fore.RED,
logging.WARNING: colorama.Fore.YELLOW,
logging.INFO: colorama.Fore.WHITE,
logging.DEBUG: colorama.Fore.BLUE,
}
class ColorFormatter(logging.Formatter):
def format(self, record, *args, **kwargs):
new_record = copy.copy(record)
if new_record.levelno in LOG_COLORS:
new_record.msg= "{color_begin}{message}{color_end}".format(
message=new_record.msg,
color_begin=LOG_COLORS[new_record.levelno],
color_end=colorama.Style.RESET_ALL,
)
return super(ColorFormatter, self).format(new_record, *args, **kwargs)

View file

@ -1,114 +0,0 @@
#!/usr/bin/env python3
import hashlib
import logging
import ssl
import socket
import time
import click
logging.basicConfig(level=logging.INFO,
format='[%(levelname)-4s] %(message)s',
datefmt='%Y-%m-%d %H:%M')
logger = logging.getLogger('certo')
# The following inspired by:
# https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data
# https://www.binarytides.com/receive-full-data-with-the-recv-socket-function-in-python/
def recv_msg(sock, timeout):
# Read message length and unpack it into an integer
data = b''
begin = time.time()
n = 1024
while len(data) < n or time.time() - begin < timeout:
try:
data += sock.recv(n - len(data))
logger.debug("Partial: %r" % data)
except socket.timeout:
pass
if b'Ready to start TLS\r\n' in data:
break
time.sleep(0.1)
if len(data) == 0:
return None
logger.debug("Data: %r" % data)
return data
def establish_conn(addr, port, starttls):
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(5)
try:
if starttls:
logger.debug("Using STARTTLS")
logger.debug("Connecting to %s:%s" % (addr, port))
sock.connect((addr, port))
sock.send(b"STARTTLS\r\n")
data = recv_msg(sock, 5)
if data is None:
raise socket.error
wrapped_socket = ssl.wrap_socket(sock)
else:
wrapped_socket = ssl.wrap_socket(sock)
wrapped_socket.connect((addr, port))
return wrapped_socket.getpeercert(True)
finally:
wrapped_socket.close()
def get_cert(addr, port, starttls):
cert = establish_conn(addr, port, starttls)
pem_cert = ssl.DER_cert_to_PEM_cert(cert)
logger.debug("The certificate is:\n%s" % pem_cert)
return cert
def capitalize_and_colons(in_hash):
in_hash = in_hash.upper()
new_hash = in_hash[0:2]
for i in range(2, len(in_hash), 2):
new_hash += ":" + in_hash[i:i+2]
return new_hash
def compute_fingerprints(cert, with_colons):
thumb_md5 = hashlib.md5(cert).hexdigest()
thumb_sha1 = hashlib.sha1(cert).hexdigest()
thumb_sha256 = hashlib.sha256(cert).hexdigest()
logger.info("MD5: " + thumb_md5)
if with_colons:
logger.info(" " + capitalize_and_colons(thumb_md5))
logger.info("SHA1: " + thumb_sha1)
if with_colons:
logger.info(" " + capitalize_and_colons(thumb_sha1))
logger.info("SHA256: " + thumb_sha256)
if with_colons:
logger.info(" " + capitalize_and_colons(thumb_sha256))
@click.command()
@click.argument('address')#, help="address to be used to retrieve the certificate")
@click.option('-p', '--port', default=443, type=click.IntRange(1,65535), help="The port to connect to.")
@click.option('--starttls', is_flag=True, flag_value=True, help="Whether to use starttls on connection.")
@click.option('--debug/--nodebug', is_flag=True, flag_value=False, help="Debug output.")
@click.option('-o', '--output', help="Path to save the certificate to.")
@click.option('--colons/--nocolons', is_flag=True, flag_value=False, help="Whether to output also hashed with colons")
def doit(address, port, starttls, debug, output, colons):
if debug:
logger.setLevel(logging.DEBUG)
cert = get_cert(address, port, starttls)
if output:
with open(output, 'w') as f:
logger.debug("Opening file %s" % output)
f.write(ssl.DER_cert_to_PEM_cert(cert))
logger.info("The certificate has been saved to %s" % output)
compute_fingerprints(cert, colons)
if __name__ == '__main__':
doit()

View file

@ -1,14 +1,16 @@
from setuptools import setup from setuptools import find_packages, setup
setup( setup(
name='certo', name='certo',
version='0.3', version='0.4',
py_modules=['get_cert'], py_modules=['certo'],
packages=find_packages(),
install_requires=[ install_requires=[
'Click', 'Click',
'colorama',
], ],
entry_points=''' entry_points='''
[console_scripts] [console_scripts]
certo=get_cert:doit certo=certo.cli:do_it
''', ''',
) )