Source code for adaptivemd.mongodb.object

##############################################################################
# adaptiveMD: A Python Framework to Run Adaptive Molecular Dynamics (MD)
#             Simulations on HPC Resources
# Copyright 2017 FU Berlin and the Authors
#
# Authors: Jan-Hendrik Prinz
# Contributors:
#
# `adaptiveMD` is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################

# part of the code below was taken from `openpathsampling` see
# <http://www.openpathsampling.org> or
# <http://github.com/openpathsampling/openpathsampling
# for details and license


import logging
from uuid import UUID
from weakref import WeakValueDictionary

from base import StorableMixin
from cache import MaxCache, Cache, NoCache, \
    WeakLRUCache
from proxy import LoaderProxy

logger = logging.getLogger(__name__)


[docs]class ObjectStore(StorableMixin): """ Base Class for storing complex objects in a netCDF4 file. It holds a reference to the store file.` Attributes ---------- content_class : :obj:`mongodb.base.StorableMixin` a reference to the class type to be stored using this Storage. Must be subclassed from :obj:`mongodb.base.StorableMixin` cache : :py:class:`mongodb.cache.Cache` a dictionary that holds references to all stored elements by index or string for named objects. This is only used for cached access if caching is not False. Must be of type :obj:`mongodb.base.StorableMixin` or subclassed. """ _restore_non_initial_attr = False allowed_types = [ 'int', 'float', 'long', 'str', 'bool', 'numpy.float32', 'numpy.float64', 'numpy.int8', 'numpy.inf16', 'numpy.int32', 'numpy.int64', 'numpy.uint8', 'numpy.uinf16', 'numpy.uint32', 'numpy.uint64', 'index', 'length', 'uuid' ] default_store_chunk_size = 256 default_cache = 10000
[docs] def __init__(self, name, content_class): """ Parameters ---------- name : str the name of the store content_class : class the base class of the content, must be subclassed from `StorableMixin` """ super(ObjectStore, self).__init__() self._storage = None self.content_class = content_class self.cache = NoCache() self._free = set() self._cached_all = False self._created = False self._document = None self.name = name self.attribute_list = {} self.cv = {} # This will not be stored since its information is contained in the # dimension names self._dimension_name_store = None self.variables = dict() self.units = dict() self.index = None self.proxy_index = WeakValueDictionary() if self.content_class is not None \ and not issubclass(self.content_class, StorableMixin): raise ValueError( 'Content class "%s" must be subclassed from StorableMixin.' % self.content_class.__name__)
[docs] def is_created(self): return self._created
[docs] def to_dict(self): return { 'content_class': self.content_class, 'name': self.name }
[docs] def check_size(self): """ Perform an update in case the DB has been extended by an external source Returns ------- bool returns True if an update was performed """ if len(self) > len(self.index): self.load_indices() return True return False
[docs] def register(self, storage): """ Associate the object store to a specific storage with a given name Parameters ---------- storage : :class:`mongodb.NetCDFPlus` the storage to be associated with """ self._storage = storage self.name = self.name self.index = self.create_uuid_index() self._document = storage.db[self.name]
@staticmethod
[docs] def create_uuid_index(): return []
[docs] def restore(self): self.load_indices()
[docs] def load_indices(self): # self.index.clear() # self.index.extend( self.index = [int(UUID(x)) for x in self._document.distinct('_id')]
@property def storage(self): """Return the associated storage object Returns ------- :class:`mongodb.NetCDFPlus` the referenced storage object """ if self._storage is None: raise RuntimeError( 'A storage needs to be added to this store to be used! ' 'Use .register() to do so.') return self._storage def __str__(self): return repr(self) def __repr__(self): return 'store.%s[%s] : %s' % ( self.name, self.content_class.__name__ if self.content_class is not None else 'None/ANY', str(len(self)) + ' object(s)' ) @property def simplifier(self): """ Return the simplifier instance used to create JSON serialization Returns ------- :class:`mongodb.dictify.StorableObjectJSON` the simplifier object used in the associated storage """ return self.storage.simplifier
[docs] def set_caching(self, caching): """ Set the caching mode for this store Parameters ---------- caching : :class:`mongodb.Cache` """ if caching is None: caching = self.default_cache if caching is True: caching = MaxCache() elif caching is False: caching = NoCache() elif type(caching) is int: caching = WeakLRUCache(caching) if isinstance(caching, Cache): self.cache = caching.transfer(self.cache)
[docs] def idx(self, obj): """ Return the index in this store for a given object Parameters ---------- obj : :class:`mongodb.base.StorableMixin` the object that can be stored in this store for which its index is to be returned Returns ------- int or None The integer index of the given object or None if it is not stored yet """ return self.index[obj.__uuid__]
def __iter__(self): """ Add iteration over all elements in the storage """ self.check_size() for uuid in list(self.index): yield self.load(uuid) def __len__(self): """ Return the number of stored objects Returns ------- int number of stored objects """ if hasattr(self, '_document'): if self._document: return self._document.count() return 0
[docs] def proxy(self, item): """ Return a proxy of a object for this store Parameters ---------- item : :py:class:`mongodb.base.StorableMixin` or int The item or index that points to an object in this store and to which a proxy is requested. Returns ------- """ if item is None: return None tt = type(item) if tt is long: idx = item elif tt in [str, unicode]: if item[0] == '-': return None idx = int(UUID(item)) else: idx = item.__uuid__ return LoaderProxy(self, idx)
def __contains__(self, item): if item.__uuid__ in self.index: return True if self.check_size(): if item.__uuid__ in self.index: return True return False def __getitem__(self, item): """ Enable numpy style selection of object in the store """ try: if type(item) is int: if item < 0: item += len(self) return self.load(item) elif type(item) is str or type(item) is long: return self.load(item) elif type(item) is list: return [self.load(idx) for idx in item] elif item is Ellipsis: return iter(self) except KeyError: return None
[docs] def get(self, item): try: return self[item] except KeyError: if self.check_size(): try: return self[item] except KeyError: pass return None
[docs] def consume_one(self, test_fnc=None): """ Remove one object and return it in the process Parameters ---------- test_fnc : function only objects that match by this function are considered Returns ------- None or `StorableMixin` if None then no object was altered, otherwise the changed object is returned """ consumed = None while consumed is None and len(self) > 0: if test_fnc is None: one = self.one else: try: one = next(t for t in self if test_fnc(t)) except StopIteration: break idx = one.__uuid__ erg = self._document.remove({'_id': str(UUID(int=idx))}) if erg['ok']: consumed = one else: # this means we have a racing condition and the one we found had # had been deleted in the meantime # just retry and get another pass if consumed is not None: self.index.remove(consumed.__uuid__) if consumed.__uuid__ in self.cache: del self.cache[consumed.__uuid__] return consumed
[docs] def modify_one(self, key, value, update): """ Change an attribute of one object Parameters ---------- key : str the attributes name to be changed value : object the old value to be found and changed update : object the new value to the changed into Returns ------- None or `StorableMixin` if None then no object was altered, otherwise the changed object is returned """ modified = None while modified is None and len(self) > 0: erg = self._document.find_and_modify( query={key: value}, update={"$set": {key: update}}, upsert=False ) if erg is not None: # success, we got it idx = int(UUID(erg['_id'])) # remove from cache if idx in self.cache: del self.cache[idx] modified = self.load(idx) return modified
[docs] def modify_test_one(self, test_fnc, key, value, update): """ Change an attribute of one object that matches a function Parameters ---------- test_fnc : function only objects that match by this function are considered key : str the attributes name to be changed value : object the old value to be found and changed update : object the new value to the changed into Returns ------- None or `StorableMixin` if None then no object was altered, otherwise the changed object is returned """ modified = None while modified is None and len(self) > 0: try: found_ones = self._document.find({key: value}) one = next(t for t in ( self.load(int(UUID(f['_id']))) for f in found_ones) if test_fnc(t)) except StopIteration: break idx = one.__uuid__ erg = self._document.find_and_modify( query={key: value, '_id': str(UUID(int=idx))}, update={"$set": {key: update}}, upsert=False ) if erg is not None: # success, we got it # remove from cache if idx in self.cache: del self.cache[idx] modified = self.load(idx) return modified
def _load(self, idx): obj = self.storage.simplifier.from_simple_dict( self._document.find_one({'_id': str(UUID(int=idx))})) obj.__store__ = self return obj
[docs] def clear_cache(self): """Clear the cache and force reloading""" self.cache.clear() self._cached_all = False
[docs] def cache_all(self): """Load all samples as fast as possible into the cache""" if not self._cached_all: idxs = range(len(self)) jsons = self.variables['json'][:] [self.add_single_to_cache(i, j) for i, j in zip( idxs, jsons)] self._cached_all = True
def _save(self, obj): dct = self.storage.simplifier.to_simple_dict(obj) self._document.insert(dct) obj.__store__ = self @property def last(self): """ Returns the last generated trajectory. Useful to continue a run. Returns ------- :py:class:`mongodb.base.StorableMixin` the last stored object in this store """ return self.load(len(self) - 1) @property def first(self): """ Returns the first stored object. Returns ------- :py:class:`mongodb.base.StorableMixin` the actual first stored object """ return self.load(0) @property def one(self): """ Returns one random object. Returns ------- `StorableMixin` the content of the store """ idx = int(UUID(self._document.find_one()['_id'])) return self.load(idx) @property def last(self): """ Returns the last saved object. This is only accurate to seconds! Returns ------- `StorableMixin` the content of the store """ idx = int(UUID(self._document.find_one(sort=[("_time", -1)])['_id'])) return self.load(idx) @property def first(self): """ Returns the first saved object. This is only accurate to seconds! Returns ------- `StorableMixin` the content of the store """ idx = int(UUID(self._document.find_one(sort=[("_time", 1)])['_id'])) return self.load(idx)
[docs] def free(self): """ Return the number of the next free index for this store Returns ------- index : int the number of the next free index in the storage. Used to store a new object. """ idx = len(self) return idx
[docs] def initialize(self): """ Initialize the associated storage to allow for object storage. Mainly creates an index dimension with the name of the object. """ self._created = True
# ========================================================================== # LOAD/SAVE DECORATORS FOR CACHE HANDLING # ==========================================================================
[docs] def find_one(self, dct): idx = self._document.find_one(dct)['_id'] return self.load(int(UUID(idx)))
[docs] def load(self, idx): """ Returns an object from the storage. Parameters ---------- idx : int the integer index of the object to be loaded Returns ------- :py:class:`mongodb.base.StorableMixin` the loaded object """ if type(idx) is str: idx = int(UUID(self._document.find_one({'name': idx})['_id'])) if type(idx) is long: if idx not in self.index: self.check_size() if idx not in self.index: raise ValueError( 'str %s not found in storage' % idx) else: raise ValueError(( 'indices of type "%s" are not allowed in named storage ' '(only str and long)') % type(idx).__name__ ) # if it is in the cache, return it try: obj = self.cache[idx] logger.debug('Found IDX #' + str(idx) + ' in cache. Not loading!') return obj except KeyError: pass logger.debug( 'Calling load object of type `%s` @ IDX #%d' % (self.content_class.__name__, idx)) obj = self._load(idx) logger.debug( 'Calling load object of type %s and IDX # %d ... DONE' % (self.content_class.__name__, idx)) if obj is not None: # update cache there might have been a change due to naming self.cache[idx] = obj logger.debug( 'Try loading UUID object of type %s and IDX # %d ... DONE' % (self.content_class.__name__, idx)) logger.debug( 'Finished load object of type %s and IDX # %d ... DONE' % (self.content_class.__name__, idx)) return obj
@staticmethod
[docs] def reference(obj): return obj.__uuid__
[docs] def save(self, obj): """ Saves an object to the storage. Parameters ---------- obj : :class:`mongodb.base.StorableMixin` the object to be stored """ uuid = obj.__uuid__ if uuid in self.index: # has been saved so quit and do nothing return self.reference(obj) if isinstance(obj, LoaderProxy): if obj._store is self: # is a proxy of a saved object so do nothing return uuid else: # it is stored but not in this store so we try storing the # full attribute which might be still in cache or memory # if that is not the case it will be stored again. This can # happen when you load from one store save to another. And load # again after some time while the cache has been changed and try # to save again the loaded object. We will not explicitly store # a table that matches objects between different storages. return self.save(obj.__subject__) if not isinstance(obj, self.content_class): raise ValueError(( 'This store can only store object of base type "%s". Given ' 'obj is of type "%s". You might need to use another store.') % (self.content_class, obj.__class__.__name__) ) # mark as saved so circular dependencies will not cause infinite loops n_idx = len(self.index) self.index.append(uuid) logger.debug('Saving ' + str(type(obj)) + ' using IDX #' + str(uuid)) try: self._save(obj) self.cache[uuid] = obj except: # in case we did not succeed remove the mark as being saved del self.index[n_idx] raise return self.reference(obj)
[docs] def add_single_to_cache(self, idx, json): """ Add a single object to cache by json Parameters ---------- idx : int the index where the object was stored json : str json string the represents a serialized version of the stored object """ if idx not in self.cache: obj = self.simplifier.from_json(json) # self._get_id(idx, obj) self.cache[idx] = obj self.index[obj.__uuid__] = idx return obj