fixes #6: global settings

This commit is contained in:
Davide Alberani 2017-02-26 11:36:00 +01:00
parent 3082e6cbad
commit e141723cf0
3 changed files with 90 additions and 8 deletions

49
ibt2.py
View file

@ -503,6 +503,40 @@ class CurrentUserHandler(BaseHandler):
self.write(user_info) self.write(user_info)
class SettingsHandler(BaseHandler):
"""Handle global settings."""
collection = 'settings'
def get(self, id_=None):
query = {}
if id_ is not None:
query['_id'] = id_
res = self.db.query(self.collection, query)
res = dict((i.get('_id'), i.get('value')) for i in res if '_id' in i)
if id_ is not None:
res = {id_: res.get(id_)}
self.write(res)
def post(self, id_=None):
if not self.current_user_info.get('isAdmin'):
return self.build_error(status=401, message='insufficient permissions: must be an admin')
data = self.clean_body
if id_ is not None:
# if we access a specific resource, we assume the data is in {_id: value} format
if id_ not in data:
self.write({'success': False})
return
data = {id_: data[id_]}
for key, value in data.items():
if self.db.get(self.collection, key):
self.db.update(self.collection, {'_id': key}, {'value': value})
else:
self.db.add(self.collection, {'_id': key, 'value': value})
self.write({'success': True})
put = post
class LoginHandler(RootHandler): class LoginHandler(RootHandler):
"""Handle user authentication requests.""" """Handle user authentication requests."""
@ -594,7 +628,7 @@ def run():
settings[key] = setting settings[key] = setting
init_params = dict(db=db_connector, listen_port=options.port, logger=logger, init_params = dict(db=db_connector, listen_port=options.port, logger=logger,
ssl_options=ssl_options, settings=settings) ssl_options=ssl_options, global_settings=settings)
# If not present, we store a user 'admin' with password 'ibt2' into the database. # If not present, we store a user 'admin' with password 'ibt2' into the database.
if not db_connector.query('users', {'username': 'admin'}): if not db_connector.query('users', {'username': 'admin'}):
@ -603,14 +637,14 @@ def run():
'isAdmin': True}) 'isAdmin': True})
# If present, use the cookie_secret stored into the database. # If present, use the cookie_secret stored into the database.
cookie_secret = settings.get('server_cookie_secret') cookie_secret = db_connector.get('server_settings', 'server_cookie_secret')
if cookie_secret: if cookie_secret:
cookie_secret = cookie_secret['cookie_secret'] cookie_secret = cookie_secret['value']
else: else:
# the salt guarantees its uniqueness # the salt guarantees its uniqueness
cookie_secret = utils.hash_password('__COOKIE_SECRET__') cookie_secret = utils.hash_password('__COOKIE_SECRET__')
db_connector.add('settings', db_connector.add('server_settings',
{'setting': 'server_cookie_secret', 'cookie_secret': cookie_secret}) {'_id': 'server_cookie_secret', 'value': cookie_secret})
_days_path = r"/days/?(?P<day>[\d_-]+)?" _days_path = r"/days/?(?P<day>[\d_-]+)?"
_days_info_path = r"/days/(?P<day>[\d_-]+)/info" _days_info_path = r"/days/(?P<day>[\d_-]+)/info"
@ -619,6 +653,7 @@ def run():
_attendees_path = r"/attendees/?(?P<id_>[\w\d_-]+)?" _attendees_path = r"/attendees/?(?P<id_>[\w\d_-]+)?"
_current_user_path = r"/users/current/?" _current_user_path = r"/users/current/?"
_users_path = r"/users/?(?P<id_>[\w\d_-]+)?/?(?P<resource>[\w\d_-]+)?/?(?P<resource_id>[\w\d_-]+)?" _users_path = r"/users/?(?P<id_>[\w\d_-]+)?/?(?P<resource>[\w\d_-]+)?/?(?P<resource_id>[\w\d_-]+)?"
_settings_path = r"/settings/?(?P<id_>[\w\d_ -]+)?/?"
application = tornado.web.Application([ application = tornado.web.Application([
(_attendees_path, AttendeesHandler, init_params), (_attendees_path, AttendeesHandler, init_params),
(r'/v%s%s' % (API_VERSION, _attendees_path), AttendeesHandler, init_params), (r'/v%s%s' % (API_VERSION, _attendees_path), AttendeesHandler, init_params),
@ -634,6 +669,8 @@ def run():
(r'/v%s%s' % (API_VERSION, _current_user_path), CurrentUserHandler, init_params), (r'/v%s%s' % (API_VERSION, _current_user_path), CurrentUserHandler, init_params),
(_users_path, UsersHandler, init_params), (_users_path, UsersHandler, init_params),
(r'/v%s%s' % (API_VERSION, _users_path), UsersHandler, init_params), (r'/v%s%s' % (API_VERSION, _users_path), UsersHandler, init_params),
(_settings_path, SettingsHandler, init_params),
(r'/v%s%s' % (API_VERSION, _settings_path), SettingsHandler, init_params),
(r"/(?:index.html)?", RootHandler, init_params), (r"/(?:index.html)?", RootHandler, init_params),
(r'/login', LoginHandler, init_params), (r'/login', LoginHandler, init_params),
(r'/v%s/login' % API_VERSION, LoginHandler, init_params), (r'/v%s/login' % API_VERSION, LoginHandler, init_params),
@ -642,7 +679,7 @@ def run():
(r'/?(.*)', tornado.web.StaticFileHandler, {"path": "dist"}) (r'/?(.*)', tornado.web.StaticFileHandler, {"path": "dist"})
], ],
static_path=os.path.join(os.path.dirname(__file__), "dist/static"), static_path=os.path.join(os.path.dirname(__file__), "dist/static"),
cookie_secret='__COOKIE_SECRET__', cookie_secret=cookie_secret,
login_url='/login', login_url='/login',
debug=options.debug) debug=options.debug)
http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None) http_server = tornado.httpserver.HTTPServer(application, ssl_options=ssl_options or None)

View file

@ -162,7 +162,7 @@ class Monco(object):
:param collection: search for documents in this collection :param collection: search for documents in this collection
:type collection: str :type collection: str
:param query: search for documents with those attributes :param query: search for documents with those attributes
:type query: dict or None :type query: dict, list or None
:returns: list of matching documents :returns: list of matching documents
:rtype: list :rtype: list

View file

@ -39,6 +39,7 @@ class Ibt2Tests(unittest.TestCase):
self.db['attendees'].drop() self.db['attendees'].drop()
self.db['days'].drop() self.db['days'].drop()
self.db['groups'].drop() self.db['groups'].drop()
self.db['settings'].drop()
self.db['users'].delete_one({'username': 'newuser'}) self.db['users'].delete_one({'username': 'newuser'})
self.db['users'].delete_one({'username': 'newuser2'}) self.db['users'].delete_one({'username': 'newuser2'})
@ -46,6 +47,7 @@ class Ibt2Tests(unittest.TestCase):
self.db['attendees'].drop() self.db['attendees'].drop()
self.db['days'].drop() self.db['days'].drop()
self.db['groups'].drop() self.db['groups'].drop()
self.db['settings'].drop()
self.db['users'].delete_one({'username': 'newuser'}) self.db['users'].delete_one({'username': 'newuser'})
self.db['users'].delete_one({'username': 'newuser2'}) self.db['users'].delete_one({'username': 'newuser2'})
@ -161,7 +163,7 @@ class Ibt2Tests(unittest.TestCase):
r.connection.close() r.connection.close()
s = self.login('newuser', 'ibt2') s = self.login('newuser', 'ibt2')
r = s.delete(BASE_URL + 'users/' + id_) r = s.delete(BASE_URL + 'users/' + id_)
r.raise_for_status() self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status)
r.connection.close() r.connection.close()
r = s.delete(BASE_URL + 'users/' + id2_) r = s.delete(BASE_URL + 'users/' + id2_)
self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status) self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status)
@ -235,5 +237,48 @@ class Ibt2Tests(unittest.TestCase):
self.assertTrue(rj == {}) self.assertTrue(rj == {})
r.connection.close() r.connection.close()
def test_settings(self):
r = requests.get(BASE_URL + 'settings/non-existant')
r.raise_for_status()
rj = r.json()
r.connection.close()
self.assertEqual({'non-existant': None}, rj)
settings = {'key1': 'value1', 'key2': 'value2'}
r = requests.post(BASE_URL + 'settings', json=settings)
self.assertRaises(requests.exceptions.HTTPError, r.raise_for_status)
s = self.login('admin', 'ibt2')
r = s.post(BASE_URL + 'settings', json=settings)
r.raise_for_status()
rj = r.json()
r.connection.close()
self.assertTrue(rj.get('success'), True)
r = requests.get(BASE_URL + 'settings')
r.raise_for_status()
rj = r.json()
r.connection.close()
self.assertEqual(rj, settings)
r = requests.get(BASE_URL + 'settings/key1')
r.raise_for_status()
rj = r.json()
r.connection.close()
self.assertEqual(rj, {'key1': 'value1'})
r = requests.get(BASE_URL + 'settings/key2')
r.raise_for_status()
rj = r.json()
r.connection.close()
self.assertEqual(rj, {'key2': 'value2'})
r = s.put(BASE_URL + 'settings/key2', json={'key2': 'value3'})
r.raise_for_status()
rj = r.json()
r.connection.close()
self.assertTrue(rj.get('success'), True)
r = requests.get(BASE_URL + 'settings/key2')
r.raise_for_status()
rj = r.json()
r.connection.close()
self.assertEqual(rj, {'key2': 'value3'})
if __name__ == '__main__': if __name__ == '__main__':
unittest.main(verbosity=2) unittest.main(verbosity=2)