#!/usr/bin/env python3 # -*- coding: utf-8 -*- """qrcode_reader Scan the output of a serial QR Code reader. Copyright 2017 Davide Alberani RaspiBO Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. """ import os import io import sys import time import json import serial import urllib import logging import argparse import datetime import requests import configparser from requests.packages.urllib3.exceptions import InsecureRequestWarning TIMEOUT = 3 logger = logging.getLogger('qrcode_reader') logging.basicConfig(level=logging.INFO) logging.getLogger('requests').setLevel(logging.WARNING) logging.getLogger('urllib3').setLevel(logging.WARNING) requests.packages.urllib3.disable_warnings(InsecureRequestWarning) def convert_obj(obj): try: return int(obj) except: pass if isinstance(obj, str): obj_l = obj.lower() if obj_l in ['true', 'on', 'yes']: return True elif obj_l in ['false', 'off', 'no']: return False elif obj == '%NOW%': return str(datetime.datetime.utcnow()) return obj def convert(seq): if isinstance(seq, dict): d = {} for key, item in seq.items(): d[key] = convert(item) return d if isinstance(seq, (list, tuple)): return [convert(x) for x in seq] return convert_obj(seq) class ImprovedEncoder(json.JSONEncoder): """Enhance the default JSON encoder to serialize datetime instances.""" def default(self, o): if isinstance(o, (datetime.datetime, datetime.date, datetime.time, datetime.timedelta)): try: return str(o) except Exception: pass elif isinstance(o, set): return list(o) return json.JSONEncoder.default(self, o) # Inject our class as the default encoder. json._default_encoder = ImprovedEncoder() class Connector(): def __init__(self, cfg): self.cfg = cfg self.session = None self.url = cfg['eventman']['url'] self.login_url = urllib.parse.urljoin(self.url, '/v1.0/login') self.checkin_url = urllib.parse.urljoin(self.url, os.path.join('/v1.0/events/', cfg['event']['id'], 'tickets/')) self.login() def login(self): logger.debug('connecting to eventman at %s' % self.login_url) self.session = requests.Session() self.session.verify = False ca = cfg['eventman'].get('ca') if ca and os.path.isfile(ca): self.session.verify = ca username = cfg['eventman'].get('username') password = cfg['eventman'].get('password') params = {} if username: params['username'] = username if password: params['password'] = password req = self.session.post(self.login_url, json=params, timeout=TIMEOUT) req.raise_for_status() req.connection.close() logger.info('connection to eventman at %s established' % self.login_url) def checkin(self, code): msg = 'scanning code %s: ' % code limit_field = None try: limit_field = self.cfg['event'].getint('limit_field') except: pass if limit_field: code = code[:limit_field] _searchFor = '%s:%s' % (cfg['event']['field'], code) params = {cfg['event']['field']: code, '_errorMessage': 'code: %s' % code, '_searchFor': _searchFor} checkin_url = self.checkin_url + '?' + urllib.parse.urlencode(params) json = convert(dict(self.cfg['actions'])) req = self.session.put(checkin_url, json=json, timeout=TIMEOUT) error = False try: req.raise_for_status() msg += 'ok' except requests.exceptions.HTTPError: error = True msg += 'error: %s' % req.json().get('message') if not error: logger.info(msg) else: logger.warning(msg) req.connection.close() def scan(port): retry = 1 logger.info('trying to connect to %s, please wait...' % port) while True: logger.debug('waiting for connection on port %s...' % port) try: ser = serial.Serial(port=port, timeout=1) break except serial.serialutil.SerialException as ex: if retry >= 20: logger.error('unable to connect: %s' % ex) sys.exit(2) time.sleep(1) retry += 1 logger.info('connected to %s' % port) ser_io = io.TextIOWrapper(io.BufferedRWPair(ser, ser, 1), newline='\r', line_buffering=True) while True: try: line = ser_io.readline().strip() except serial.serialutil.SerialException as ex: logger.error('disconnected: %s' % ex) sys.exit(3) if not line: continue yield line if __name__ == '__main__': parser = argparse.ArgumentParser() parser.add_argument('-c', '--code', help='specify a single code', action='store') parser.add_argument('--config', help='user a different configuration file (default: qrcode_reader.ini)', action='store', default='qrcode_reader.ini') args = parser.parse_args() cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation()) cfg.read(args.config) if cfg['qrcode_reader'].getboolean('debug'): logger.setLevel(logging.DEBUG) try: connector = Connector(cfg) except Exception as ex: logger.error('unable to connect to %s: %s' % (cfg['eventman']['url'], ex)) sys.exit(1) if args.code: connector.checkin(args.code) else: try: for code in scan(port=cfg['connection']['port']): try: connector.checkin(code) except Exception as ex: logger.error('error at checkin for code %s: %s', code, ex) except KeyboardInterrupt: logger.info('exiting...')