eventman/tools/qrcode_reader.py

199 lines
6.4 KiB
Python
Raw Normal View History

2017-04-01 14:43:22 +02:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""qrcode_reader
2017-04-01 14:43:22 +02:00
Scan the output of a serial QR Code reader.
Copyright 2017 Davide Alberani <da@erlug.linux.it>
RaspiBO <info@raspibo.org>
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
import os
2017-04-01 14:43:22 +02:00
import io
import sys
import time
2017-04-15 17:34:06 +02:00
import json
2017-04-01 14:43:22 +02:00
import serial
import urllib
2017-04-15 15:02:48 +02:00
import logging
import argparse
2017-04-15 17:34:06 +02:00
import datetime
2017-04-01 14:43:22 +02:00
import requests
import configparser
2017-04-15 15:02:48 +02:00
from requests.packages.urllib3.exceptions import InsecureRequestWarning
2017-04-18 22:20:43 +02:00
TIMEOUT = 3
2017-04-15 15:02:48 +02:00
logger = logging.getLogger('qrcode_reader')
logging.basicConfig(level=logging.INFO)
logging.getLogger('requests').setLevel(logging.WARNING)
logging.getLogger('urllib3').setLevel(logging.WARNING)
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
2017-04-01 14:43:22 +02:00
def convert_obj(obj):
try:
return int(obj)
except:
pass
if isinstance(obj, str):
obj_l = obj.lower()
if obj_l in ['true', 'on', 'yes']:
return True
elif obj_l in ['false', 'off', 'no']:
return False
2017-04-15 17:34:06 +02:00
elif obj == '%NOW%':
2017-10-15 16:30:09 +02:00
return str(datetime.datetime.utcnow())
return obj
def convert(seq):
if isinstance(seq, dict):
d = {}
for key, item in seq.items():
d[key] = convert(item)
return d
if isinstance(seq, (list, tuple)):
return [convert(x) for x in seq]
return convert_obj(seq)
2017-04-15 17:34:06 +02:00
class ImprovedEncoder(json.JSONEncoder):
"""Enhance the default JSON encoder to serialize datetime instances."""
def default(self, o):
if isinstance(o, (datetime.datetime, datetime.date,
datetime.time, datetime.timedelta)):
try:
return str(o)
2017-04-23 17:17:54 +02:00
except Exception:
2017-04-15 17:34:06 +02:00
pass
elif isinstance(o, set):
return list(o)
return json.JSONEncoder.default(self, o)
# Inject our class as the default encoder.
json._default_encoder = ImprovedEncoder()
2017-04-01 14:43:22 +02:00
class Connector():
def __init__(self, cfg):
self.cfg = cfg
self.session = None
self.url = cfg['eventman']['url']
self.login_url = urllib.parse.urljoin(self.url, '/v1.0/login')
self.checkin_url = urllib.parse.urljoin(self.url, os.path.join('/v1.0/events/',
cfg['event']['id'], 'tickets/'))
self.login()
def login(self):
2017-04-23 17:24:13 +02:00
logger.debug('connecting to eventman at %s' % self.login_url)
self.session = requests.Session()
self.session.verify = False
ca = cfg['eventman'].get('ca')
if ca and os.path.isfile(ca):
self.session.verify = ca
username = cfg['eventman'].get('username')
password = cfg['eventman'].get('password')
params = {}
if username:
params['username'] = username
if password:
params['password'] = password
req = self.session.post(self.login_url, json=params, timeout=TIMEOUT)
req.raise_for_status()
req.connection.close()
logger.info('connection to eventman at %s established' % self.login_url)
2017-04-01 14:43:22 +02:00
def checkin(self, code):
2017-04-15 14:22:05 +02:00
msg = 'scanning code %s: ' % code
2017-04-22 13:41:03 +02:00
limit_field = None
try:
limit_field = self.cfg['event'].getint('limit_field')
except:
pass
if limit_field:
code = code[:limit_field]
_searchFor = '%s:%s' % (cfg['event']['field'], code)
params = {cfg['event']['field']: code, '_errorMessage': 'code: %s' % code, '_searchFor': _searchFor}
2017-04-15 14:22:05 +02:00
checkin_url = self.checkin_url + '?' + urllib.parse.urlencode(params)
json = convert(dict(self.cfg['actions']))
2017-04-18 22:20:43 +02:00
req = self.session.put(checkin_url, json=json, timeout=TIMEOUT)
2017-04-15 15:02:48 +02:00
error = False
try:
req.raise_for_status()
2017-04-15 14:22:05 +02:00
msg += 'ok'
2017-04-23 17:17:54 +02:00
except requests.exceptions.HTTPError:
2017-04-15 15:02:48 +02:00
error = True
2017-04-15 14:22:05 +02:00
msg += 'error: %s' % req.json().get('message')
2017-04-15 15:02:48 +02:00
if not error:
logger.info(msg)
else:
logger.warning(msg)
2017-04-01 14:43:22 +02:00
req.connection.close()
def scan(port):
retry = 1
2017-04-15 16:32:53 +02:00
logger.info('trying to connect to %s, please wait...' % port)
while True:
2017-04-15 15:02:48 +02:00
logger.debug('waiting for connection on port %s...' % port)
try:
ser = serial.Serial(port=port, timeout=1)
break
except serial.serialutil.SerialException as ex:
2017-04-15 14:22:05 +02:00
if retry >= 20:
2017-04-15 15:02:48 +02:00
logger.error('unable to connect: %s' % ex)
sys.exit(2)
time.sleep(1)
retry += 1
2017-04-15 15:02:48 +02:00
logger.info('connected to %s' % port)
2017-04-01 14:43:22 +02:00
ser_io = io.TextIOWrapper(io.BufferedRWPair(ser, ser, 1), newline='\r', line_buffering=True)
while True:
2017-04-15 16:32:53 +02:00
try:
line = ser_io.readline().strip()
except serial.serialutil.SerialException as ex:
logger.error('disconnected: %s' % ex)
sys.exit(3)
2017-04-01 14:43:22 +02:00
if not line:
continue
yield line
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument('-c', '--code', help='specify a single code', action='store')
parser.add_argument('--config', help='user a different configuration file (default: qrcode_reader.ini)',
action='store', default='qrcode_reader.ini')
args = parser.parse_args()
cfg = configparser.ConfigParser(interpolation=configparser.ExtendedInterpolation())
cfg.read(args.config)
2017-04-15 15:02:48 +02:00
if cfg['qrcode_reader'].getboolean('debug'):
2017-04-18 22:20:43 +02:00
logger.setLevel(logging.DEBUG)
2017-04-23 17:24:13 +02:00
try:
connector = Connector(cfg)
except Exception as ex:
logger.error('unable to connect to %s: %s' % (cfg['eventman']['url'], ex))
sys.exit(1)
if args.code:
connector.checkin(args.code)
else:
try:
for code in scan(port=cfg['connection']['port']):
2017-04-23 17:24:13 +02:00
try:
connector.checkin(code)
except Exception as ex:
logger.error('error at checkin for code %s: %s', code, ex)
except KeyboardInterrupt:
logger.info('exiting...')