123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308 |
- """Monco: a MongoDB database backend
- Classes and functions used to issue queries to a MongoDB database.
- Copyright 2016-2017 Davide Alberani <da@erlug.linux.it>
- RaspiBO <info@raspibo.org>
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- """
- import re
- import pymongo
- from bson.objectid import ObjectId
- re_objectid = re.compile(r'[0-9a-f]{24}')
- _force_conversion = {
- '_id': ObjectId
- }
- def convert_obj(obj):
- """Convert an object in a format suitable to be stored in MongoDB.
- :param obj: object to convert
- :returns: object that can be stored in MongoDB.
- """
- if obj is None:
- return None
- if isinstance(obj, bool):
- return obj
- try:
- if re_objectid.match(obj):
- return ObjectId(obj)
- except:
- pass
- return obj
- def convert(seq):
- """Convert an object to a format suitable to be stored in MongoDB,
- descending lists, tuples and dictionaries (a copy is returned).
- :param seq: sequence or object to convert
- :returns: object that can be stored in MongoDB.
- """
- if isinstance(seq, dict):
- d = {}
- for key, item in seq.items():
- if key in _force_conversion:
- try:
- d[key] = _force_conversion[key](item)
- except:
- d[key] = item
- else:
- d[key] = convert(item)
- return d
- if isinstance(seq, (list, tuple)):
- return [convert(x) for x in seq]
- return convert_obj(seq)
- class MoncoError(Exception):
- """Base class for Monco exceptions."""
- pass
- class MoncoConnectionError(MoncoError):
- """Monco exceptions raise when a connection problem occurs."""
- pass
- class Monco(object):
- """MongoDB connector."""
- db = None
- connection = None
- # map operations on lists of items.
- _operations = {
- 'update': '$set',
- 'append': '$push',
- 'appendUnique': '$addToSet',
- 'delete': '$pull',
- 'increment': '$inc'
- }
- def __init__(self, dbName, url=None):
- """Initialize the instance, connecting to the database.
- :param dbName: name of the database
- :type dbName: str (or None to use the dbName passed at initialization)
- :param url: URL of the database
- :type url: str (or None to connect to localhost)
- """
- self._url = url
- self._dbName = dbName
- self.connect(url)
- def connect(self, dbName=None, url=None):
- """Connect to the database.
- :param dbName: name of the database
- :type dbName: str (or None to use the dbName passed at initialization)
- :param url: URL of the database
- :type url: str (or None to connect to localhost)
- :returns: the database we're connected to
- :rtype: :class:`~pymongo.database.Database`
- """
- if self.db is not None:
- return self.db
- if url:
- self._url = url
- if dbName:
- self._dbName = dbName
- if not self._dbName:
- raise MoncoConnectionError('no database name specified')
- self.connection = pymongo.MongoClient(self._url)
- self.db = self.connection[self._dbName]
- return self.db
- def getOne(self, collection, query=None):
- """Get a single document with the specified `query`.
- :param collection: search the document in this collection
- :type collection: str
- :param query: query to filter the documents
- :type query: dict or None
- :returns: the first document matching the query
- :rtype: dict
- """
- results = self.query(collection, convert(query))
- return results and results[0] or {}
- def get(self, collection, _id):
- """Get a single document with the specified `_id`.
- :param collection: search the document in this collection
- :type collection: str
- :param _id: unique ID of the document
- :type _id: str or :class:`~bson.objectid.ObjectId`
- :returns: the document with the given `_id`
- :rtype: dict
- """
- return self.getOne(collection, {'_id': _id})
- def query(self, collection, query=None, condition='or'):
- """Get multiple documents matching a query.
- :param collection: search for documents in this collection
- :type collection: str
- :param query: search for documents with those attributes
- :type query: dict, list or None
- :returns: list of matching documents
- :rtype: list
- """
- db = self.connect()
- query = convert(query or {})
- if isinstance(query, (list, tuple)):
- query = {'$%s' % condition: query}
- return list(db[collection].find(query))
- def add(self, collection, data, _id=None):
- """Insert a new document.
- :param collection: insert the document in this collection
- :type collection: str
- :param data: the document to store
- :type data: dict
- :param _id: the _id of the document to store; if None, it's generated
- :type _id: object
- :returns: the document, as created in the database
- :rtype: dict
- """
- db = self.connect()
- data = convert(data)
- if _id is not None:
- data['_id'] = _id
- _id = db[collection].insert(data)
- return self.get(collection, _id)
- def insertOne(self, collection, data):
- """Insert a document, avoiding duplicates.
- :param collection: update a document in this collection
- :type collection: str
- :param data: the document information to store
- :type data: dict
- :returns: True if the document was already present
- :rtype: bool
- """
- db = self.connect()
- data = convert(data)
- ret = db[collection].update(data, {'$set': data}, upsert=True)
- return ret['updatedExisting']
- def _buildSearchPattern(self, data, searchBy):
- """Return an OR condition."""
- _or = []
- for searchPattern in searchBy:
- try:
- _or.append(dict([(k, data[k]) for k in searchPattern if k in data]))
- except KeyError:
- continue
- return _or
- def update(self, collection, _id_or_query, data, operation='update',
- updateList=None, create=True):
- """Update an existing document or create it, if requested.
- _id_or_query can be an ID, a dict representing a query or a list of tuples.
- In the latter case, the tuples are put in OR; a tuple match if all of its
- items from 'data' are contained in the document.
- :param collection: update a document in this collection
- :type collection: str
- :param _id_or_query: ID of the document to be updated, or a query or a list of attributes in the data that must match
- :type _id_or_query: str or :class:`~bson.objectid.ObjectId` or iterable
- :param data: the updated information to store
- :type data: dict
- :param operation: operation used to update the document or a portion of it, like a list (update, append, appendUnique, delete, increment)
- :type operation: str
- :param updateList: if set, it's considered the name of a list (the first matching element will be updated)
- :type updateList: str
- :param create: if True, the document is created if no document matches
- :type create: bool
- :returns: a boolean (True if an existing document was updated) and the document after the update
- :rtype: tuple of (bool, dict)
- """
- db = self.connect()
- data = convert(data or {})
- _id_or_query = convert(_id_or_query)
- if isinstance(_id_or_query, (list, tuple)):
- _id_or_query = {'$or': self._buildSearchPattern(data, _id_or_query)}
- elif not isinstance(_id_or_query, dict):
- _id_or_query = {'_id': _id_or_query}
- if '_id' in data:
- del data['_id']
- operator = self._operations.get(operation)
- if updateList:
- newData = {}
- for key, value in data.items():
- newData['%s.$.%s' % (updateList, key)] = value
- data = newData
- res = db[collection].find_and_modify(query=_id_or_query,
- update={operator: data}, full_response=True, new=True, upsert=create)
- lastErrorObject = res.get('lastErrorObject') or {}
- return lastErrorObject.get('updatedExisting', False), res.get('value') or {}
- def updateMany(self, collection, query, data):
- """Update multiple existing documents.
- query can be an ID or a dict representing a query.
- :param collection: update documents in this collection
- :type collection: str
- :param query: a query or a list of attributes in the data that must match
- :type query: str or :class:`~bson.objectid.ObjectId` or iterable
- :param data: the updated information to store
- :type data: dict
- :returns: a dict with the success state and number of updated items
- :rtype: dict
- """
- db = self.connect()
- data = convert(data or {})
- query = convert(query)
- if not isinstance(query, dict):
- query = {'_id': query}
- if '_id' in data:
- del data['_id']
- return db[collection].update(query, {'$set': data}, multi=True)
- def delete(self, collection, _id_or_query=None, force=False):
- """Remove one or more documents from a collection.
- :param collection: search the documents in this collection
- :type collection: str
- :param _id_or_query: unique ID of the document or query to match multiple documents
- :type _id_or_query: str or :class:`~bson.objectid.ObjectId` or dict
- :param force: force the deletion of all documents, when `_id_or_query` is empty
- :type force: bool
- :returns: dictionary with the number or removed documents
- :rtype: dict
- """
- if not _id_or_query and not force:
- return
- db = self.connect()
- if not isinstance(_id_or_query, dict):
- _id_or_query = {'_id': _id_or_query}
- _id_or_query = convert(_id_or_query)
- return db[collection].remove(_id_or_query)
|