From 6b4ed154518b38a324c2f191a2617c36ebbc8796 Mon Sep 17 00:00:00 2001 From: Davide Alberani Date: Sun, 15 Jan 2017 15:16:41 +0100 Subject: [PATCH] users management --- ibt2.py | 81 +++++++++++++++++++++++--------------- tests/ibt2_tests.py | 96 ++++++++++++++++++++++++++++++++++----------- 2 files changed, 123 insertions(+), 54 deletions(-) diff --git a/ibt2.py b/ibt2.py index d20e412..ae8b840 100755 --- a/ibt2.py +++ b/ibt2.py @@ -113,7 +113,7 @@ class BaseHandler(tornado.web.RequestHandler): @property def current_user(self): - """Retrieve current user name from the secure cookie.""" + """Retrieve current user ID from the secure cookie.""" return self.get_secure_cookie("user") @property @@ -124,10 +124,9 @@ class BaseHandler(tornado.web.RequestHandler): return self._users_cache[current_user] user_info = {} if current_user: - user_info['username'] = current_user - res = self.db.query('users', {'username': current_user}) - if res: - user = res[0] + user_info['_id'] = current_user + user = self.db.getOne('users', {'_id': user_info['_id']}) + if user: user_info = user self._users_cache[current_user] = user_info return user_info @@ -367,24 +366,25 @@ class UsersHandler(CollectionHandler): document = 'user' collection = 'users' - def filter_get(self, data): - if 'password' in data: - del data['password'] - return data - - def filter_get_all(self, data): - if 'users' not in data: - return data - for user in data['users']: - if 'password' in user: - del user['password'] - return data + @gen.coroutine + def get(self, id_=None, **kwargs): + if id_: + output = self.db.getOne(self.collection, {'_id': id_}) + if 'password' in output: + del output['password'] + else: + output = {self.collection: self.db.query(self.collection, self.arguments)} + for user in output['users']: + if 'password' in user: + del user['password'] + self.write(output) @gen.coroutine - def get(self, id_=None, resource=None, resource_id=None, acl=True, **kwargs): - super(UsersHandler, self).get(id_, resource, resource_id, acl=acl, **kwargs) - - def filter_input_post_all(self, data): + def post(self, **kwargs): + data = escape.json_decode(self.request.body or '{}') + self._clean_dict(data) + if '_id' in data: + del data['_id'] username = (data.get('username') or '').strip() password = (data.get('password') or '').strip() email = (data.get('email') or '').strip() @@ -393,10 +393,22 @@ class UsersHandler(CollectionHandler): res = self.db.query('users', {'username': username}) if res: raise InputException('username already exists') - return {'username': username, 'password': utils.hash_password(password), - 'email': email} + data['username'] = username + data['email'] = email + data['password'] = utils.hash_password(password) + if 'isAdmin' in data: + del data['isAdmin'] + doc = self.db.add(self.collection, data) + if 'password' in doc: + del doc['password'] + self.write(doc) - def filter_input_put(self, data): + @gen.coroutine + def put(self, id_=None, **kwargs): + data = escape.json_decode(self.request.body or '{}') + self._clean_dict(data) + if id_ is None: + return self.build_error(status=404, message='unable to access the resource') old_pwd = data.get('old_password') new_pwd = data.get('new_password') if old_pwd is not None: @@ -410,15 +422,21 @@ class UsersHandler(CollectionHandler): if '_id' in data: # Avoid overriding _id del data['_id'] - return data + if str(self.current_user_info.get('_id')) != id_ and not self.current_user_info.get('isAdmin'): + return self.build_error(status=401, message='insufficient permissions: current user') + merged, doc = self.db.update(self.collection, {'_id': id_}, data) + self.write(doc) @gen.coroutine - def put(self, id_=None, resource=None, resource_id=None, **kwargs): + def delete(self, id_=None, **kwargs): if id_ is None: return self.build_error(status=404, message='unable to access the resource') - if str(self.current_user_info.get('_id')) != id_: + if str(self.current_user_info.get('_id')) != id_ and not self.current_user_info.get('isAdmin'): return self.build_error(status=401, message='insufficient permissions: current user') - super(UsersHandler, self).put(id_, resource, resource_id, **kwargs) + if id_ in self._users_cache: + del self._users_cache[id_] + howMany = self.db.delete(self.collection, id_) + self.write({'success': True, 'deleted entries': howMany.get('n')}) class SettingsHandler(BaseHandler): @@ -466,10 +484,11 @@ class LoginHandler(RootHandler): self.write({'error': True, 'message': 'missing username or password'}) return authorized, user = self.user_authorized(username, password) - if authorized and user.get('username'): + if authorized and 'username' in user and '_id' in user: + id_ = str(user['_id']) username = user['username'] - logging.info('successful login for user %s' % username) - self.set_secure_cookie("user", username) + logging.info('successful login for user %s (id: %s)' % (username, id_)) + self.set_secure_cookie("user", id_) user_info = self.current_user_info if 'password' in user_info: del user_info['password'] diff --git a/tests/ibt2_tests.py b/tests/ibt2_tests.py index f08000d..e3af52d 100755 --- a/tests/ibt2_tests.py +++ b/tests/ibt2_tests.py @@ -1,8 +1,8 @@ #!/usr/bin/env python -"""I'll Be There 2 (ibt2) - tests +"""I'll Be There, 2 (ibt2) - tests -Copyright 2016 Davide Alberani - RaspiBO +Copyright 2016-2017 Davide Alberani + RaspiBO Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -32,15 +32,16 @@ def dictInDict(d, dContainer): class Ibt2Tests(unittest.TestCase): - #@classmethod - #def setUpClass(cls): def setUp(self): self.monco_conn = monco.Monco(dbName=DB_NAME) self.connection = self.monco_conn.connection self.db = self.monco_conn.db - self.connection.drop_database(DB_NAME) + self.db['attendees'].drop() + self.db['users'].remove({'username': 'newuser'}) + self.db['users'].remove({'username': 'newuser2'}) def tearDown(self): + return self.add_attendee({'day': '2017-01-15', 'name': 'A name', 'group': 'group A'}) self.add_attendee({'day': '2017-01-16', 'name': 'A new name', 'group': 'group C'}) self.add_attendee({'day': '2017-01-15', 'name': 'Another name', 'group': 'group A'}) @@ -52,8 +53,6 @@ class Ibt2Tests(unittest.TestCase): return r def test_add_attendee(self): - # POST /attendees/ {name: 'A Name', day: '2017-01-15', group: 'A group'} - # GET /attendees/:id attendee = {'name': 'A Name', 'day': '2017-01-15', 'group': 'A group'} r = self.add_attendee(attendee) rj = r.json() @@ -65,8 +64,6 @@ class Ibt2Tests(unittest.TestCase): self.assertTrue(dictInDict(attendee, rj)) def test_put_attendee(self): - # POST /attendees/ {name: 'A Name', day: '2017-01-15', group: 'A group'} - # GET /attendees/:id attendee = {'name': 'A Name', 'day': '2017-01-15', 'group': 'A group'} r = self.add_attendee(attendee) update = {'notes': 'A note'} @@ -83,8 +80,6 @@ class Ibt2Tests(unittest.TestCase): self.assertTrue(dictInDict(final, rj)) def test_delete_attendee(self): - # POST /attendees/ {name: 'A Name', day: '2017-01-15', group: 'A group'} - # GET /attendees/:id attendee = {'name': 'A Name', 'day': '2017-01-15', 'group': 'A group'} r = self.add_attendee(attendee) id_ = r.json().get('_id') @@ -118,19 +113,74 @@ class Ibt2Tests(unittest.TestCase): self.assertEqual(rj, {"days": [{"groups_count": 2, "day": "2017-01-15"}, {"groups_count": 1, "day": "2017-01-16"}]}) - def _test_post_day_group(self): - # POST /days/ {day: '2017-01-04'} - # GET /days/2017-01-04 - day = '2017-01-15' - query = {'day': day, 'groups': [{'name': 'group1'}]} - r = requests.post(BASE_URL + 'days', json=query) + def test_create_user(self): + r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt2'}) + r.raise_for_status() + s = self.login('newuser', 'ibt2') + r = s.get(BASE_URL + 'users/current') + r.raise_for_status() + + def test_update_user(self): + r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt2'}) + r.raise_for_status() + id_ = r.json()['_id'] + r = requests.post(BASE_URL + 'users', json={'username': 'newuser2', 'password': 'ibt2'}) + r.raise_for_status() + id2_ = r.json()['_id'] + r = requests.put(BASE_URL + 'users/' + id_, json={'email': 't@example.com'}) + self.assertRaises(r.raise_for_status) + s = self.login('newuser', 'ibt2') + r = s.put(BASE_URL + 'users/' + id_, json={'email': 'test@example.com'}) + r.raise_for_status() + self.assertEqual(r.json().get('email'), 'test@example.com') + r = s.put(BASE_URL + 'users/' + id2_, json={'email': 'test@example.com'}) + self.assertRaises(r.raise_for_status) + s = self.login('admin', 'ibt2') + r = s.put(BASE_URL + 'users/' + id_, json={'email': 'test2@example.com'}) + r.raise_for_status() + self.assertEqual(r.json().get('email'), 'test2@example.com') + + def test_delete_user(self): + r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt2'}) + r.raise_for_status() + id_ = r.json()['_id'] + r = requests.post(BASE_URL + 'users', json={'username': 'newuser2', 'password': 'ibt2'}) + r.raise_for_status() + id2_ = r.json()['_id'] + r = requests.delete(BASE_URL + 'users/' + id_) + self.assertRaises(r.raise_for_status) + s = self.login('newuser', 'ibt2') + r = s.delete(BASE_URL + 'users/' + id_) + r.raise_for_status() + r = s.delete(BASE_URL + 'users/' + id2_) + self.assertRaises(r.raise_for_status) + s = self.login('admin', 'ibt2') + r = s.delete(BASE_URL + 'users/' + id2_) + r.raise_for_status() + + def test_duplicate_user(self): + r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt2'}) + r.raise_for_status() + r = requests.post(BASE_URL + 'users', json={'username': 'newuser', 'password': 'ibt3'}) + self.assertRaises(r.raise_for_status) + + def login(self, username, password): + s = requests.Session() + r = s.post(BASE_URL + 'login', json={'username': username, 'password': password}) + r.raise_for_status() + return s + + def test_created_by(self): + s = self.login('admin', 'ibt2') + r = s.get(BASE_URL + 'users/current') + r.raise_for_status() + user_id = r.json()['_id'] + attendee = {'day': '2017-01-15', 'name': 'A name', 'group': 'group A'} + r = s.post('%sattendees' % BASE_URL, json=attendee) r.raise_for_status() rj = r.json() - self.assertTrue(dictInDict(query, rj)) - r = requests.get('%s%s/%s' % (BASE_URL, 'days', day)) - r.raise_for_status() - rj = r.json() - self.assertTrue(dictInDict(query, rj)) + self.assertEqual(user_id, rj['created_by']) + self.assertEqual(user_id, rj['updated_by']) if __name__ == '__main__': unittest.main(verbosity=2)