Compare commits
1 commit
Author | SHA1 | Date | |
---|---|---|---|
75c3828c60 |
6 changed files with 191 additions and 118 deletions
7
certo/__init__.py
Normal file
7
certo/__init__.py
Normal file
|
@ -0,0 +1,7 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
"""Top-level package for Certo."""
|
||||||
|
|
||||||
|
__author__ = """Blallo"""
|
||||||
|
__email__ = 'blallo@autistici.org'
|
||||||
|
__version__ = '0.4.0'
|
120
certo/certo.py
Normal file
120
certo/certo.py
Normal file
|
@ -0,0 +1,120 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
"""Main module."""
|
||||||
|
|
||||||
|
import hashlib
|
||||||
|
import logging
|
||||||
|
import ssl
|
||||||
|
import socket
|
||||||
|
import sys
|
||||||
|
import time
|
||||||
|
|
||||||
|
from . import colorlog
|
||||||
|
|
||||||
|
|
||||||
|
class Certo():
|
||||||
|
"""
|
||||||
|
Main class. It expect two parameters to configure the logger:
|
||||||
|
|
||||||
|
:param debugging: toggle the output of debugging messages.
|
||||||
|
:param colored: toggle the colored log format
|
||||||
|
"""
|
||||||
|
def __init__(self, debugging, colored):
|
||||||
|
self.debugging = debugging
|
||||||
|
self.colored = colored
|
||||||
|
if debugging:
|
||||||
|
self.log_level = logging.DEBUG
|
||||||
|
else:
|
||||||
|
self.log_level = logging.INFO
|
||||||
|
|
||||||
|
if colored:
|
||||||
|
self.formatter = colorlog.ColorFormatter("%(message)s")
|
||||||
|
else:
|
||||||
|
self.formatter = logging.Formatter("[%(levelname)4s] %(message)s")
|
||||||
|
|
||||||
|
self.logger = logging.getLogger(__name__)
|
||||||
|
self.logger.setLevel(self.log_level)
|
||||||
|
self._console_handler = logging.StreamHandler(sys.stdout)
|
||||||
|
self._console_handler.setLevel(self.log_level)
|
||||||
|
self._console_handler.setFormatter(self.formatter)
|
||||||
|
self.logger.addHandler(self._console_handler)
|
||||||
|
self.logger.debug("Formatter %r" % self.formatter)
|
||||||
|
|
||||||
|
# The following inspired by:
|
||||||
|
# https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data
|
||||||
|
# https://www.binarytides.com/receive-full-data-with-the-recv-socket-function-in-python/
|
||||||
|
def recv_msg(self, sock, timeout):
|
||||||
|
# Read message length and unpack it into an integer
|
||||||
|
data = b''
|
||||||
|
begin = time.time()
|
||||||
|
n = 1024
|
||||||
|
while len(data) < n or time.time() - begin < timeout:
|
||||||
|
try:
|
||||||
|
data += sock.recv(n - len(data))
|
||||||
|
self.logger.debug("Partial: %r" % data)
|
||||||
|
except socket.timeout:
|
||||||
|
pass
|
||||||
|
if b'Ready to start TLS\r\n' in data:
|
||||||
|
break
|
||||||
|
time.sleep(0.1)
|
||||||
|
if len(data) == 0:
|
||||||
|
return None
|
||||||
|
self.logger.debug("Data: %r" % data)
|
||||||
|
return data
|
||||||
|
|
||||||
|
def establish_conn(self, addr, port, starttls):
|
||||||
|
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
sock.settimeout(5)
|
||||||
|
try:
|
||||||
|
if starttls:
|
||||||
|
self.logger.debug("Using STARTTLS")
|
||||||
|
self.logger.debug("Connecting to %s:%s" % (addr, port))
|
||||||
|
sock.connect((addr, port))
|
||||||
|
sock.send(b"STARTTLS\r\n")
|
||||||
|
data = self.recv_msg(sock, 5)
|
||||||
|
if data is None:
|
||||||
|
raise socket.error
|
||||||
|
wrapped_socket = ssl.wrap_socket(sock)
|
||||||
|
else:
|
||||||
|
wrapped_socket = ssl.wrap_socket(sock)
|
||||||
|
wrapped_socket.connect((addr, port))
|
||||||
|
|
||||||
|
return wrapped_socket.getpeercert(True)
|
||||||
|
|
||||||
|
except socket.timeout:
|
||||||
|
self.logger.error("Timeout trying to connect to %s:%s" % (addr, port))
|
||||||
|
sys.exit(2)
|
||||||
|
|
||||||
|
except socket.gaierror:
|
||||||
|
self.logger.error("Failure in name resolution: %s" % addr)
|
||||||
|
sys.exit(3)
|
||||||
|
|
||||||
|
wrapped_socket.close()
|
||||||
|
|
||||||
|
def get_cert(self, addr, port, starttls):
|
||||||
|
cert = self.establish_conn(addr, port, starttls)
|
||||||
|
pem_cert = ssl.DER_cert_to_PEM_cert(cert)
|
||||||
|
self.logger.debug("The certificate is:\n%s" % pem_cert)
|
||||||
|
|
||||||
|
return cert
|
||||||
|
|
||||||
|
def capitalize_and_colons(self, in_hash):
|
||||||
|
in_hash = in_hash.upper()
|
||||||
|
new_hash = in_hash[0:2]
|
||||||
|
for i in range(2, len(in_hash), 2):
|
||||||
|
new_hash += ":" + in_hash[i:i+2]
|
||||||
|
return new_hash
|
||||||
|
|
||||||
|
def compute_fingerprints(self, cert, with_colons):
|
||||||
|
thumb_md5 = hashlib.md5(cert).hexdigest()
|
||||||
|
thumb_sha1 = hashlib.sha1(cert).hexdigest()
|
||||||
|
thumb_sha256 = hashlib.sha256(cert).hexdigest()
|
||||||
|
self.logger.info("MD5: " + thumb_md5)
|
||||||
|
if with_colons:
|
||||||
|
self.logger.info(" " + self.capitalize_and_colons(thumb_md5))
|
||||||
|
self.logger.info("SHA1: " + thumb_sha1)
|
||||||
|
if with_colons:
|
||||||
|
self.logger.info(" " + self.capitalize_and_colons(thumb_sha1))
|
||||||
|
self.logger.info("SHA256: " + thumb_sha256)
|
||||||
|
if with_colons:
|
||||||
|
self.logger.info(" " + self.capitalize_and_colons(thumb_sha256))
|
31
certo/cli.py
Normal file
31
certo/cli.py
Normal file
|
@ -0,0 +1,31 @@
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
|
"""Console script for certo."""
|
||||||
|
|
||||||
|
import ssl
|
||||||
|
|
||||||
|
import click
|
||||||
|
|
||||||
|
from . import certo
|
||||||
|
|
||||||
|
|
||||||
|
@click.command()
|
||||||
|
@click.argument('address')#, help="address to be used to retrieve the certificate")
|
||||||
|
@click.option('-p', '--port', default=443, type=click.IntRange(1,65535), help="The port to connect to.")
|
||||||
|
@click.option('--starttls', is_flag=True, flag_value=True, help="Whether to use starttls on connection.")
|
||||||
|
@click.option('--debug/--nodebug', is_flag=True, flag_value=False, help="Debug output.")
|
||||||
|
@click.option('-o', '--output', help="Path to save the certificate to.")
|
||||||
|
@click.option('--colons/--nocolons', is_flag=True, flag_value=False, help="Whether to output also hashed with colons.")
|
||||||
|
@click.option('--colors/--nocolors', is_flag=True, flag_value=True, help="Toggles the colored output.")
|
||||||
|
def do_it(address, port, starttls, debug, output, colons, colors):
|
||||||
|
grabber = certo.Certo(debug, colors)
|
||||||
|
cert = grabber.get_cert(address, port, starttls)
|
||||||
|
if output:
|
||||||
|
with open(output, 'w') as f:
|
||||||
|
grabber.logger.debug("Opening file %s" % output)
|
||||||
|
f.write(ssl.DER_cert_to_PEM_cert(cert))
|
||||||
|
grabber.logger.info("The certificate has been saved to %s" % output)
|
||||||
|
grabber.compute_fingerprints(cert, colons)
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
do_it()
|
27
certo/colorlog.py
Normal file
27
certo/colorlog.py
Normal file
|
@ -0,0 +1,27 @@
|
||||||
|
# -*- encoding: utf-8 -*-
|
||||||
|
"""
|
||||||
|
Module to produce colored log output.
|
||||||
|
"""
|
||||||
|
# Taken from http://uran198.github.io/en/python/2016/07/12/colorful-python-logging.html
|
||||||
|
import logging
|
||||||
|
import colorama
|
||||||
|
import copy
|
||||||
|
|
||||||
|
LOG_COLORS = {
|
||||||
|
logging.ERROR: colorama.Fore.RED,
|
||||||
|
logging.WARNING: colorama.Fore.YELLOW,
|
||||||
|
logging.INFO: colorama.Fore.WHITE,
|
||||||
|
logging.DEBUG: colorama.Fore.BLUE,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
class ColorFormatter(logging.Formatter):
|
||||||
|
def format(self, record, *args, **kwargs):
|
||||||
|
new_record = copy.copy(record)
|
||||||
|
if new_record.levelno in LOG_COLORS:
|
||||||
|
new_record.msg= "{color_begin}{message}{color_end}".format(
|
||||||
|
message=new_record.msg,
|
||||||
|
color_begin=LOG_COLORS[new_record.levelno],
|
||||||
|
color_end=colorama.Style.RESET_ALL,
|
||||||
|
)
|
||||||
|
return super(ColorFormatter, self).format(new_record, *args, **kwargs)
|
114
get_cert.py
114
get_cert.py
|
@ -1,114 +0,0 @@
|
||||||
#!/usr/bin/env python3
|
|
||||||
|
|
||||||
import hashlib
|
|
||||||
import logging
|
|
||||||
import ssl
|
|
||||||
import socket
|
|
||||||
import time
|
|
||||||
|
|
||||||
import click
|
|
||||||
|
|
||||||
|
|
||||||
logging.basicConfig(level=logging.INFO,
|
|
||||||
format='[%(levelname)-4s] %(message)s',
|
|
||||||
datefmt='%Y-%m-%d %H:%M')
|
|
||||||
logger = logging.getLogger('certo')
|
|
||||||
|
|
||||||
|
|
||||||
# The following inspired by:
|
|
||||||
# https://stackoverflow.com/questions/17667903/python-socket-receive-large-amount-of-data
|
|
||||||
# https://www.binarytides.com/receive-full-data-with-the-recv-socket-function-in-python/
|
|
||||||
def recv_msg(sock, timeout):
|
|
||||||
# Read message length and unpack it into an integer
|
|
||||||
data = b''
|
|
||||||
begin = time.time()
|
|
||||||
n = 1024
|
|
||||||
while len(data) < n or time.time() - begin < timeout:
|
|
||||||
try:
|
|
||||||
data += sock.recv(n - len(data))
|
|
||||||
logger.debug("Partial: %r" % data)
|
|
||||||
except socket.timeout:
|
|
||||||
pass
|
|
||||||
if b'Ready to start TLS\r\n' in data:
|
|
||||||
break
|
|
||||||
time.sleep(0.1)
|
|
||||||
if len(data) == 0:
|
|
||||||
return None
|
|
||||||
logger.debug("Data: %r" % data)
|
|
||||||
return data
|
|
||||||
|
|
||||||
|
|
||||||
def establish_conn(addr, port, starttls):
|
|
||||||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
||||||
sock.settimeout(5)
|
|
||||||
try:
|
|
||||||
if starttls:
|
|
||||||
logger.debug("Using STARTTLS")
|
|
||||||
logger.debug("Connecting to %s:%s" % (addr, port))
|
|
||||||
sock.connect((addr, port))
|
|
||||||
sock.send(b"STARTTLS\r\n")
|
|
||||||
data = recv_msg(sock, 5)
|
|
||||||
if data is None:
|
|
||||||
raise socket.error
|
|
||||||
wrapped_socket = ssl.wrap_socket(sock)
|
|
||||||
else:
|
|
||||||
wrapped_socket = ssl.wrap_socket(sock)
|
|
||||||
wrapped_socket.connect((addr, port))
|
|
||||||
|
|
||||||
return wrapped_socket.getpeercert(True)
|
|
||||||
|
|
||||||
finally:
|
|
||||||
wrapped_socket.close()
|
|
||||||
|
|
||||||
|
|
||||||
def get_cert(addr, port, starttls):
|
|
||||||
cert = establish_conn(addr, port, starttls)
|
|
||||||
pem_cert = ssl.DER_cert_to_PEM_cert(cert)
|
|
||||||
logger.debug("The certificate is:\n%s" % pem_cert)
|
|
||||||
|
|
||||||
return cert
|
|
||||||
|
|
||||||
|
|
||||||
def capitalize_and_colons(in_hash):
|
|
||||||
in_hash = in_hash.upper()
|
|
||||||
new_hash = in_hash[0:2]
|
|
||||||
for i in range(2, len(in_hash), 2):
|
|
||||||
new_hash += ":" + in_hash[i:i+2]
|
|
||||||
return new_hash
|
|
||||||
|
|
||||||
|
|
||||||
def compute_fingerprints(cert, with_colons):
|
|
||||||
thumb_md5 = hashlib.md5(cert).hexdigest()
|
|
||||||
thumb_sha1 = hashlib.sha1(cert).hexdigest()
|
|
||||||
thumb_sha256 = hashlib.sha256(cert).hexdigest()
|
|
||||||
logger.info("MD5: " + thumb_md5)
|
|
||||||
if with_colons:
|
|
||||||
logger.info(" " + capitalize_and_colons(thumb_md5))
|
|
||||||
logger.info("SHA1: " + thumb_sha1)
|
|
||||||
if with_colons:
|
|
||||||
logger.info(" " + capitalize_and_colons(thumb_sha1))
|
|
||||||
logger.info("SHA256: " + thumb_sha256)
|
|
||||||
if with_colons:
|
|
||||||
logger.info(" " + capitalize_and_colons(thumb_sha256))
|
|
||||||
|
|
||||||
|
|
||||||
@click.command()
|
|
||||||
@click.argument('address')#, help="address to be used to retrieve the certificate")
|
|
||||||
@click.option('-p', '--port', default=443, type=click.IntRange(1,65535), help="The port to connect to.")
|
|
||||||
@click.option('--starttls', is_flag=True, flag_value=True, help="Whether to use starttls on connection.")
|
|
||||||
@click.option('--debug/--nodebug', is_flag=True, flag_value=False, help="Debug output.")
|
|
||||||
@click.option('-o', '--output', help="Path to save the certificate to.")
|
|
||||||
@click.option('--colons/--nocolons', is_flag=True, flag_value=False, help="Whether to output also hashed with colons")
|
|
||||||
def doit(address, port, starttls, debug, output, colons):
|
|
||||||
if debug:
|
|
||||||
logger.setLevel(logging.DEBUG)
|
|
||||||
cert = get_cert(address, port, starttls)
|
|
||||||
if output:
|
|
||||||
with open(output, 'w') as f:
|
|
||||||
logger.debug("Opening file %s" % output)
|
|
||||||
f.write(ssl.DER_cert_to_PEM_cert(cert))
|
|
||||||
logger.info("The certificate has been saved to %s" % output)
|
|
||||||
compute_fingerprints(cert, colons)
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
|
||||||
doit()
|
|
10
setup.py
10
setup.py
|
@ -1,14 +1,16 @@
|
||||||
from setuptools import setup
|
from setuptools import find_packages, setup
|
||||||
|
|
||||||
setup(
|
setup(
|
||||||
name='certo',
|
name='certo',
|
||||||
version='0.3',
|
version='0.4',
|
||||||
py_modules=['get_cert'],
|
py_modules=['certo'],
|
||||||
|
packages=find_packages(),
|
||||||
install_requires=[
|
install_requires=[
|
||||||
'Click',
|
'Click',
|
||||||
|
'colorama',
|
||||||
],
|
],
|
||||||
entry_points='''
|
entry_points='''
|
||||||
[console_scripts]
|
[console_scripts]
|
||||||
certo=get_cert:doit
|
certo=certo.cli:do_it
|
||||||
''',
|
''',
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in a new issue