123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- """qrcode_reader
- Scan the output of a serial QR Code reader.
- Copyright 2017 Davide Alberani <da@erlug.linux.it>
- RaspiBO <info@raspibo.org>
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
- import os
- import io
- import sys
- import time
- import json
- import serial
- import urllib
- import logging
- import argparse
- import datetime
- import requests
- import configparser
- from requests.packages.urllib3.exceptions import InsecureRequestWarning
- TIMEOUT = 3
- logger = logging.getLogger('qrcode_reader')
- logging.basicConfig(level=logging.INFO)
- logging.getLogger('requests').setLevel(logging.WARNING)
- logging.getLogger('urllib3').setLevel(logging.WARNING)
- requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
- def convert_obj(obj):
- try:
- return int(obj)
- except:
- pass
- if isinstance(obj, str):
- obj_l = obj.lower()
- if obj_l in ['true', 'on', 'yes']:
- return True
- elif obj_l in ['false', 'off', 'no']:
- return False
- elif obj == '%NOW%':
- return str(datetime.datetime.utcnow())
- return obj
- def convert(seq):
- if isinstance(seq, dict):
- d = {}
- for key, item in seq.items():
- d[key] = convert(item)
- return d
- if isinstance(seq, (list, tuple)):
- return [convert(x) for x in seq]
- return convert_obj(seq)
- class ImprovedEncoder(json.JSONEncoder):
- """Enhance the default JSON encoder to serialize datetime instances."""
- def default(self, o):
- if isinstance(o, (datetime.datetime, datetime.date,
- datetime.time, datetime.timedelta)):
- try:
- return str(o)
- except Exception:
- pass
- elif isinstance(o, set):
- return list(o)
- return json.JSONEncoder.default(self, o)
- # Inject our class as the default encoder.
- json._default_encoder = ImprovedEncoder()
- class Connector():
- def __init__(self, cfg):
- self.cfg = cfg
- self.session = None
- self.url = cfg['eventman']['url']
- self.login_url = urllib.parse.urljoin(self.url, '/v1.0/login')
- self.checkin_url = urllib.parse.urljoin(self.url, os.path.join('/v1.0/events/',
- cfg['event']['id'], 'tickets/'))
- self.login()
- def login(self):
- logger.debug('connecting to eventman at %s' % self.login_url)
- self.session = requests.Session()
- self.session.verify = False
- ca = cfg['eventman'].get('ca')
- if ca and os.path.isfile(ca):
- self.session.verify = ca
- username = cfg['eventman'].get('username')
- password = cfg['eventman'].get('password')
- params = {}
- if username:
- params['username'] = username
- if password:
- params['password'] = password
- req = self.session.post(self.login_url, json=params, timeout=TIMEOUT)
- req.raise_for_status()
- req.connection.close()
- logger.info('connection to eventman at %s established' % self.login_url)
- def checkin(self, code):
- msg = 'scanning code %s: ' % code
- limit_field = None
- try:
- limit_field = self.cfg['event'].getint('limit_field')
- except:
- pass
- if limit_field:
- code = code[:limit_field]
- _searchFor = '%s:%s' % (cfg['event']['field'], code)
- params = {cfg['event']['field']: code, '_errorMessage': 'code: %s' % code, '_searchFor': _searchFor}
- checkin_url = self.checkin_url + '?' + urllib.parse.urlencode(params)
- json = convert(dict(self.cfg['actions']))
- req = self.session.put(checkin_url, json=json, timeout=TIMEOUT)
- error = False
- try:
- req.raise_for_status()
- msg += 'ok'
- except requests.exceptions.HTTPError:
- error = True
- msg += 'error: %s' % req.json().get('message')
- if not error:
- logger.info(msg)
- else:
- logger.warning(msg)
- req.connection.close()
- def scan(port):
- retry = 1
- logger.info('trying to connect to %s, please wait...' % port)
- while True:
- logger.debug('waiting for connection on port %s...' % port)
- try:
- ser = serial.Serial(port=port, timeout=1)
- break
- except serial.serialutil.SerialException as ex:
- if retry >= 20:
- logger.error('unable to connect: %s' % ex)
- sys.exit(2)
- time.sleep(1)
- retry += 1
- logger.info('connected to %s' % port)
- ser_io = io.TextIOWrapper(io.BufferedRWPair(ser, ser, 1), newline='\r', line_buffering=True)
- while True:
- try:
- line = ser_io.readline().strip()
- except serial.serialutil.SerialException as ex:
- logger.error('disconnected: %s' % ex)
- sys.exit(3)
- if not line:
- continue
- yield line
- if __name__ == '__main__':
- parser = argparse.ArgumentParser()
- parser.add_argument('-c', '--code', help='specify a single code', action='store')
- parser.add_argument('--config', help='user a different configuration file (default: qrcode_reader.ini)',
- action='store', default='qrcode_reader.ini')
- args = parser.parse_args()
- cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
- cfg.read(args.config)
- if cfg['qrcode_reader'].getboolean('debug'):
- logger.setLevel(logging.DEBUG)
- try:
- connector = Connector(cfg)
- except Exception as ex:
- logger.error('unable to connect to %s: %s' % (cfg['eventman']['url'], ex))
- sys.exit(1)
- if args.code:
- connector.checkin(args.code)
- else:
- try:
- for code in scan(port=cfg['connection']['port']):
- try:
- connector.checkin(code)
- except Exception as ex:
- logger.error('error at checkin for code %s: %s', code, ex)
- except KeyboardInterrupt:
- logger.info('exiting...')
|