users management

This commit is contained in:
Davide Alberani 2017-01-15 15:16:41 +01:00
parent 3cac351b88
commit 6b4ed15451
2 changed files with 123 additions and 54 deletions

77
ibt2.py
View file

@ -113,7 +113,7 @@ class BaseHandler(tornado.web.RequestHandler):
@property
def current_user(self):
"""Retrieve current user name from the secure cookie."""
"""Retrieve current user ID from the secure cookie."""
return self.get_secure_cookie("user")
@property
@ -124,10 +124,9 @@ class BaseHandler(tornado.web.RequestHandler):
return self._users_cache[current_user]
user_info = {}
if current_user:
user_info['username'] = current_user
res = self.db.query('users', {'username': current_user})
if res:
user = res[0]
user_info['_id'] = current_user
user = self.db.getOne('users', {'_id': user_info['_id']})
if user:
user_info = user
self._users_cache[current_user] = user_info
return user_info
@ -367,24 +366,25 @@ class UsersHandler(CollectionHandler):
document = 'user'
collection = 'users'
def filter_get(self, data):
if 'password' in data:
del data['password']
return data
def filter_get_all(self, data):
if 'users' not in data:
return data
for user in data['users']:
@gen.coroutine
def get(self, id_=None, **kwargs):
if id_:
output = self.db.getOne(self.collection, {'_id': id_})
if 'password' in output:
del output['password']
else:
output = {self.collection: self.db.query(self.collection, self.arguments)}
for user in output['users']:
if 'password' in user:
del user['password']
return data
self.write(output)
@gen.coroutine
def get(self, id_=None, resource=None, resource_id=None, acl=True, **kwargs):
super(UsersHandler, self).get(id_, resource, resource_id, acl=acl, **kwargs)
def filter_input_post_all(self, data):
def post(self, **kwargs):
data = escape.json_decode(self.request.body or '{}')
self._clean_dict(data)
if '_id' in data:
del data['_id']
username = (data.get('username') or '').strip()
password = (data.get('password') or '').strip()
email = (data.get('email') or '').strip()
@ -393,10 +393,22 @@ class UsersHandler(CollectionHandler):
res = self.db.query('users', {'username': username})
if res:
raise InputException('username already exists')
return {'username': username, 'password': utils.hash_password(password),
'email': email}
data['username'] = username
data['email'] = email
data['password'] = utils.hash_password(password)
if 'isAdmin' in data:
del data['isAdmin']
doc = self.db.add(self.collection, data)
if 'password' in doc:
del doc['password']
self.write(doc)
def filter_input_put(self, data):
@gen.coroutine
def put(self, id_=None, **kwargs):
data = escape.json_decode(self.request.body or '{}')
self._clean_dict(data)
if id_ is None:
return self.build_error(status=404, message='unable to access the resource')
old_pwd = data.get('old_password')
new_pwd = data.get('new_password')
if old_pwd is not None:
@ -410,15 +422,21 @@ class UsersHandler(CollectionHandler):
if '_id' in data:
# Avoid overriding _id
del data['_id']
return data
if str(self.current_user_info.get('_id')) != id_ and not self.current_user_info.get('isAdmin'):
return self.build_error(status=401, message='insufficient permissions: current user')
merged, doc = self.db.update(self.collection, {'_id': id_}, data)
self.write(doc)
@gen.coroutine
def put(self, id_=None, resource=None, resource_id=None, **kwargs):
def delete(self, id_=None, **kwargs):
if id_ is None:
return self.build_error(status=404, message='unable to access the resource')
if str(self.current_user_info.get('_id')) != id_:
if str(self.current_user_info.get('_id')) != id_ and not self.current_user_info.get('isAdmin'):
return self.build_error(status=401, message='insufficient permissions: current user')
super(UsersHandler, self).put(id_, resource, resource_id, **kwargs)
if id_ in self._users_cache:
del self._users_cache[id_]
howMany = self.db.delete(self.collection, id_)
self.write({'success': True, 'deleted entries': howMany.get('n')})
class SettingsHandler(BaseHandler):
@ -466,10 +484,11 @@ class LoginHandler(RootHandler):
self.write({'error': True, 'message': 'missing username or password'})
return
authorized, user = self.user_authorized(username, password)
if authorized and user.get('username'):
if authorized and 'username' in user and '_id' in user:
id_ = str(user['_id'])
username = user['username']
logging.info('successful login for user %s' % username)
self.set_secure_cookie("user", username)
logging.info('successful login for user %s (id: %s)' % (username, id_))
self.set_secure_cookie("user", id_)
user_info = self.current_user_info
if 'password' in user_info:
del user_info['password']

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python
"""I'll Be There 2 (ibt2) - tests
"""I'll Be There, 2 (ibt2) - tests
Copyright 2016 Davide Alberani <da@erlug.linux.it>
Copyright 2016-2017 Davide Alberani <da@erlug.linux.it>
RaspiBO <info@raspibo.org>
Licensed under the Apache License, Version 2.0 (the "License");
@ -32,15 +32,16 @@ def dictInDict(d, dContainer):
class Ibt2Tests(unittest.TestCase):
#@classmethod
#def setUpClass(cls):
def setUp(self):
self.monco_conn = monco.Monco(dbName=DB_NAME)
self.connection = self.monco_conn.connection
self.db = self.monco_conn.db
self.connection.drop_database(DB_NAME)
self.db['attendees'].drop()
self.db['users'].remove({'username': 'newuser'})
self.db['users'].remove({'username': 'newuser2'})
def tearDown(self):
return
self.add_attendee({'day': '2017-01-15', 'name': 'A name', 'group': 'group A'})
self.add_attendee({'day': '2017-01-16', 'name': 'A new name', 'group': 'group C'})
self.add_attendee({'day': '2017-01-15', 'name': 'Another name', 'group': 'group A'})
@ -52,8 +53,6 @@ class Ibt2Tests(unittest.TestCase):
return r
def test_add_attendee(self):
# POST /attendees/ {name: 'A Name', day: '2017-01-15', group: 'A group'}
# GET /attendees/:id
attendee = {'name': 'A Name', 'day': '2017-01-15', 'group': 'A group'}
r = self.add_attendee(attendee)
rj = r.json()
@ -65,8 +64,6 @@ class Ibt2Tests(unittest.TestCase):
self.assertTrue(dictInDict(attendee, rj))
def test_put_attendee(self):
# POST /attendees/ {name: 'A Name', day: '2017-01-15', group: 'A group'}
# GET /attendees/:id
attendee = {'name': 'A Name', 'day': '2017-01-15', 'group': 'A group'}
r = self.add_attendee(attendee)
update = {'notes': 'A note'}
@ -83,8 +80,6 @@ class Ibt2Tests(unittest.TestCase):
self.assertTrue(dictInDict(final, rj))
def test_delete_attendee(self):
# POST /attendees/ {name: 'A Name', day: '2017-01-15', group: 'A group'}
# GET /attendees/:id
attendee = {'name': 'A Name', 'day': '2017-01-15', 'group': 'A group'}
r = self.add_attendee(attendee)
id_ = r.json().get('_id')
@ -118,19 +113,74 @@ class Ibt2Tests(unittest.TestCase):
self.assertEqual(rj,
{"days": [{"groups_count": 2, "day": "2017-01-15"}, {"groups_count": 1, "day": "2017-01-16"}]})
def _test_post_day_group(self):
# POST /days/ {day: '2017-01-04'}
# GET /days/2017-01-04
day = '2017-01-15'
query = {'day': day, 'groups': [{'name': 'group1'}]}
r = requests.post(BASE_URL + 'days', json=query)
def test_create_user(self):
r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt2'})
r.raise_for_status()
s = self.login('newuser', 'ibt2')
r = s.get(BASE_URL + 'users/current')
r.raise_for_status()
def test_update_user(self):
r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt2'})
r.raise_for_status()
id_ = r.json()['_id']
r = requests.post(BASE_URL + 'users', json={'username': 'newuser2', 'password': 'ibt2'})
r.raise_for_status()
id2_ = r.json()['_id']
r = requests.put(BASE_URL + 'users/' + id_, json={'email': 't@example.com'})
self.assertRaises(r.raise_for_status)
s = self.login('newuser', 'ibt2')
r = s.put(BASE_URL + 'users/' + id_, json={'email': 'test@example.com'})
r.raise_for_status()
self.assertEqual(r.json().get('email'), 'test@example.com')
r = s.put(BASE_URL + 'users/' + id2_, json={'email': 'test@example.com'})
self.assertRaises(r.raise_for_status)
s = self.login('admin', 'ibt2')
r = s.put(BASE_URL + 'users/' + id_, json={'email': 'test2@example.com'})
r.raise_for_status()
self.assertEqual(r.json().get('email'), 'test2@example.com')
def test_delete_user(self):
r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt2'})
r.raise_for_status()
id_ = r.json()['_id']
r = requests.post(BASE_URL + 'users', json={'username': 'newuser2', 'password': 'ibt2'})
r.raise_for_status()
id2_ = r.json()['_id']
r = requests.delete(BASE_URL + 'users/' + id_)
self.assertRaises(r.raise_for_status)
s = self.login('newuser', 'ibt2')
r = s.delete(BASE_URL + 'users/' + id_)
r.raise_for_status()
r = s.delete(BASE_URL + 'users/' + id2_)
self.assertRaises(r.raise_for_status)
s = self.login('admin', 'ibt2')
r = s.delete(BASE_URL + 'users/' + id2_)
r.raise_for_status()
def test_duplicate_user(self):
r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt2'})
r.raise_for_status()
r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt3'})
self.assertRaises(r.raise_for_status)
def login(self, username, password):
s = requests.Session()
r = s.post(BASE_URL + 'login', json={'username': username, 'password': password})
r.raise_for_status()
return s
def test_created_by(self):
s = self.login('admin', 'ibt2')
r = s.get(BASE_URL + 'users/current')
r.raise_for_status()
user_id = r.json()['_id']
attendee = {'day': '2017-01-15', 'name': 'A name', 'group': 'group A'}
r = s.post('%sattendees' % BASE_URL, json=attendee)
r.raise_for_status()
rj = r.json()
self.assertTrue(dictInDict(query, rj))
r = requests.get('%s%s/%s' % (BASE_URL, 'days', day))
r.raise_for_status()
rj = r.json()
self.assertTrue(dictInDict(query, rj))
self.assertEqual(user_id, rj['created_by'])
self.assertEqual(user_id, rj['updated_by'])
if __name__ == '__main__':
unittest.main(verbosity=2)