Source code for adaptivemd.bundle

##############################################################################
# adaptiveMD: A Python Framework to Run Adaptive Molecular Dynamics (MD)
#             Simulations on HPC Resources
# Copyright 2017 FU Berlin and the Authors
#
# Authors: Jan-Hendrik Prinz
# Contributors:
#
# `adaptiveMD` is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################

"""
Bundle - A set-enhancement to add filtering and store handling capabilities

A bundle can be accessed as a normal set using iteration. You can add objects
using `.add(item)` if the bundle is not a view

Examples
--------
Some basic functions

>>> bundle = Bundle(['10', '20', 1, 2, 3])
>>> str_view = bundle.c(basestring)  # only how strings
>>> print list(str_view)  # ['10', '20']
>>> fnc_view = bundle.v(lambda x: int(x) < 3)
>>> print list(fnc_view) # [1, 2]

Some `File` specific functions

>>> import adaptivemd as amd
>>> bundle = Bundle([amd.File('0.dcd'), amd.File('a.pdb')])
>>> file_view = bundle.f('*.dcd')
>>> print list(file_view)  # [File('0.dcd')]

Logic operations produce view on the resulting bundle

>>> and_bundle = str_view & fnc_view
>>> print list(and_bundle)  # []
>>> and_bundle = str_view | fnc_view
>>> print list(and_bundle)  # [1, 2, '10', '20']

A `StorableBundle` is attached to a mongodb store (a stored object list).
Adding will append the object to the store if not stored yet. All iteration
and views will always be kept synced with the DB store content.

>>> p = amd.Project('test-project')
>>> store = StoredBundle()  # new bundle
>>> store.set_store(p.storage.trajectories)  # attach to DB
>>> print list(store)  # show all trajectories
>>> len_store = store.v(lambda x: len(x) > 10)  # all trajs with len > 10
>>> print list(len_store)

Set do not have ordering so some functions do not make sense. As long as
you are working with storable objects (subclassed from `StorableMixin`)
you have some time-ordering (accurate to seconds)

>>> print store.first  # get the earlist created object
>>> print store.one    # get one (any) single object
>>> print store.last   # get the last created object

A bundle is mostly meant to work with storable objects (but does not have to)
To simplify access to certain attributes or apply function to all members you
can use the `.all` attribute and get a _delegator_ that will apply and
attribute or method to all objects

>>> print len_store.all.length  # print all lengths of all objects in len_store
>>> print store.all.path  # print all path of all trajectories
>>> # call `.execute('shutdown') on all workers in the `.workers` bundle
>>> print p.workers.all.execute('shutdown')

"""

import fnmatch
import random

import logging
logger = logging.getLogger(__name__)


[docs]class BaseBundle(object): """ BaseClass for Bundle functionality a special set of storable objects """ def __iter__(self): return iter([]) def __and__(self, other): if isinstance(other, BaseBundle): return AndBundle(self, other) return NotImplemented def __len__(self): return len([None for _ in self]) def __or__(self, other): if isinstance(other, BaseBundle): return OrBundle(self, other) return NotImplemented def __getitem__(self, item): """ Get by name Parameters ---------- item : str in this case it acts like a dict and you can ask for one object with a certain name Returns ------- object """ for f in self: if hasattr(f, 'name') and f.name == item: return f
[docs] def c(self, cls): """ Return a view bundle on all entries that are instances of a class Parameters ---------- cls : `type` a class to be filtered by Returns ------- `ViewBundle` the read-only bundle showing filtered entries """ return ViewBundle(self, lambda x: isinstance(x, cls))
[docs] def f(self, pattern): """ Return a view bundle on all entries that match a location pattern Works only when all objects are of type `File` Parameters ---------- pattern : str a string CL pattern using wildcards to match a filename Returns ------- `ViewBundle` the read-only bundle showing filtered entries """ return ViewBundle(self, lambda x: fnmatch.fnmatch(x.location, pattern))
[docs] def sorted(self, key): """ Return a view bundle where all entries are sorted by a given key attribute Parameters ---------- key : function a function to compute the key to be sorted by Returns ------- `ViewBundle` the read-only bundle showing sorted entries """ return SortedBundle(self, key)
[docs] def v(self, fnc): """ Return a view bundle on all entries that are filtered by a function Parameters ---------- fnc : function a function to be used for filtering Returns ------- `ViewBundle` the read-only bundle showing filtered entries """ return ViewBundle(self, fnc)
[docs] def pick(self): """ Pick a random element Returns ------- object or None a random object if bundle is not empty """ if self: return random.choice(tuple(self)) else: return None
def __str__(self): return '<%s for with %d file(s) @ %s>' % ( self.__class__.__name__, len(self), hex(id(self))) def __contains__(self, item): for o in self: if o == item: return True return False @property def one(self): """ Return one element from the list Use only if you just need one and do not care which one it is Returns ------- object one object (there is no guarantee that this will always be the same element) """ if len(self) > 0: return next(iter(self)) else: return None @property def all(self): """ Return a Delegator that will apply attribute and function call to all bundle elements Returns ------- `BundleDelegator` the delegator object to map to all elements in the bundle """ return BundleDelegator(self)
[docs]class Bundle(BaseBundle): """ A container of objects """
[docs] def __init__(self, iterable=None): super(Bundle, self).__init__() if iterable is None: self._set = set() elif isinstance(iterable, set): self._set = iterable else: self._set = set(iterable)
def __len__(self): if self._set is not None: return len(self._set) else: return 0
[docs] def update(self, iterable): """ Add multiple items to the bundle at once Parameters ---------- iterable : Iterable the items to be added """ map(self.add, iterable)
[docs] def add(self, item): """ Add a single item to the bundle Parameters ---------- item : object """ if self._set is not None: self._set.add(item)
def __iter__(self): if self._set is not None: return iter(self._set) else: return iter([])
[docs]class LogicBundle(BaseBundle): """ Implement simple and and or logic for bundles """
[docs] def __init__(self, bundle1, bundle2): super(LogicBundle, self).__init__() self.bundle1 = bundle1 self.bundle2 = bundle2
[docs]class AndBundle(LogicBundle): """ And logic """ def __iter__(self): return iter(set(self.bundle1) & set(self.bundle2))
[docs]class OrBundle(LogicBundle): """ Or logic """ def __iter__(self): return iter(set(self.bundle1) | set(self.bundle2))
[docs]class ViewBundle(BaseBundle): """ A view on a bundle where object are filtered by a bool function """
[docs] def __init__(self, bundle, view): super(ViewBundle, self).__init__() self.bundle = bundle self.view = view
def __iter__(self): for o in self.bundle: if self.view(o): yield o
[docs]class SortedBundle(BaseBundle): """ Sorted view of a bundle """
[docs] def __init__(self, bundle, key): self.bundle = bundle self.key = key
def __iter__(self): return iter(sorted(self.bundle, key=self.key)) @property def first(self): """ object Return the first of the sorted elements """ return next(iter(self))
[docs]class BundleDelegator(object): """ Delegate an attribute call to all elements in a bundle """
[docs] def __init__(self, bundle): self._bundle = bundle
def __getattr__(self, item): one = self._bundle.one if hasattr(one, item): attr = getattr(one, item) if callable(attr): return FunctionDelegator(self._bundle, item) else: return [getattr(x, item) for x in self._bundle] else: AttributeError('Not all objects have attribute `%s`' % item)
[docs]class FunctionDelegator(object): """ Delegate a function call to all elements in a bundle """
[docs] def __init__(self, bundle, item): self._bundle = bundle self._item = item
def __call__(self, *args, **kwargs): return [getattr(x, self._item)(*args, **kwargs) for x in self._bundle]
[docs]class StoredBundle(Bundle): """ A stored bundle in a mongodb This is a useful wrapper to turn a store of the MongoDB into a bundle of objects. Adding files will store new elements. The bundle is always in sync with the DB. """
[docs] def __init__(self): super(StoredBundle, self).__init__() self._set = None
[docs] def set_store(self, store): """ Set the used store Parameters ---------- store : `ObjectStore` a mongodb store that contains the elements in the bundle """ self._set = store return self
[docs] def close(self): """ Close the connection to the bundle. A not connected bundle will have no entries and none can be added """ self._set = None
[docs] def add(self, item): """ Add an element to the bundle Parameters ---------- item : object the item to be added to the bundle """ if self._set is not None and item not in self._set: logger.info('Added file of type `%s`' % item.__class__.__name__) self._set.save(item)
@property def last(self): """ Return the entry with the latest timestamp Returns ------- object the latest object """ if self._set is not None: return self._set.last @property def first(self): """ Return the entry with the earliest timestamp Returns ------- object the earliest object """ if self._set is not None: return self._set.first def __getitem__(self, item): # this is faster for storages if self._set is not None: return self._set[item]
[docs] def consume_one(self): """ Picks and removes one (random) element in one step. Returns ------- `StorableMixin` or None The deleted object if possible otherwise None """ if self._set is not None: return self._set.consume_one() return None
[docs] def find_all_by(self, key, value): """ Return all elements from the bundle where its key matches value Parameters ---------- key : str the attribute value : object the value to match against using `==` Returns ------- list of `StorableMixin` a list of objects in the bundle that match the search """ if self._set is not None: return [x for x in self._set if getattr(x, key) == value]