##############################################################################
# adaptiveMD: A Python Framework to Run Adaptive Molecular Dynamics (MD)
# Simulations on HPC Resources
# Copyright 2017 FU Berlin and the Authors
#
# Authors: Jan-Hendrik Prinz
# Contributors:
#
# `adaptiveMD` is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################
# Create compute units for various simulation tools
import random
import os
from adaptivemd.file import File
from adaptivemd.generator import TaskGenerator
from adaptivemd.mongodb import StorableMixin, ObjectSyncVariable
from adaptivemd.task import Task
[docs]class Engine(TaskGenerator):
"""
An generator for trajectory simulation tasks
"""
[docs] def __init__(self):
super(Engine, self).__init__()
self.types = {}
# set default output type if nothing is specified
self.add_output_type('master', 'output.dcd', 1)
[docs] @classmethod
def from_dict(cls, dct):
obj = super(Engine, cls).from_dict(dct)
obj.types = dct['types']
return obj
[docs] def to_dict(self):
dct = super(Engine, self).to_dict()
dct.update({
'types': self.types})
return dct
[docs] def run(self, target):
"""
Create a task that returns a trajectory given in the input
Parameters
----------
target : `Trajectory`
location of the created target trajectory
Returns
-------
`Task`
the task object containing the job description
"""
return None
[docs] def extend(self, target, length):
"""
Create a task that extends a trajectory given in the input
Parameters
----------
target : `Trajectory`
location of the target trajectory to be extended
length : int
number of additional frames to be computed
Returns
-------
`Task`
the task object containing the job description
"""
return None
# def file_generators(self):
# """
# Return a list of function to be run with certain classes
#
# `Trajectory` is a natural object of engine and giving a trajectory including its
# initial frame and length is enough to tell the `Engine` on what to generate. Since
# this is enough we can define that using a `Trajectory` object in `Scheduler.submit`
# will result in a simulation task.
#
# Returns
# -------
# dict of `type`: function
# the dict describing with function to run with which object type
#
# """
# return {
# Trajectory: self.run
# }
[docs] def add_output_type(self, name, filename=None, stride=1, selection=None):
"""
Add an output type for a trajectory kind to be generated by this engine
Parameters
----------
name : str
the name to call the output type by
filename : str
a filename to be used for this output type
stride : int
the stride used by this particular trajectory relative to the
native steps of the engine.
selection : str
an mdtraj.Topology.select type filter string to store only a subset
of atoms
"""
self.types[name] = OutputTypeDescription(filename, stride, selection)
@property
def native_stride(self):
"""
The least common multiple stride of all generated trajectories.
If you want consistent trajectory length your simulation length need to be
multiples of this number. The number is relative to the native time steps
Returns
-------
int
the lcm stride relative to the engines timesteps
"""
return lcmm(*[x.stride for x in self.types.values()])
@property
def full_strides(self):
"""
list of strides for trajectories that have full coordinates
this is useful to figure out from which frames you can restart a new
trajectory. Usually you only have a single one with full frames.
Returns
-------
list of int
the list of strides for full trajectories
"""
return [x.stride for x in self.types.values() if x.selection is None]
[docs]def gcd(a, b):
"""
Return greatest common divisor using Euclid's Algorithm.
"""
while b:
a, b = b, a % b
return a
[docs]def lcm(a, b):
"""
Return lowest common multiple.
"""
return a * b // gcd(a, b)
[docs]def lcmm(*args):
"""
Return lcm of args.
"""
return reduce(lcm, args)
# ------------------------------------------------------------------------------
# FILE TYPES
# ------------------------------------------------------------------------------
[docs]class Trajectory(File):
"""
Represents a trajectory :class:`File` on the cluster
Attributes
----------
location : str or `File`
the `File` location
frame : `Frame` or `File`
the initial frame used for the trajectory
length : int
the length of the trajectory in frames
engine : `Engine`
the engine used to create the trajectory
"""
_find_by = ['created', 'state', 'task', 'engine']
engine = ObjectSyncVariable('engine', 'generators', lambda x: not bool(x))
[docs] def __init__(self, location, frame, length, engine=None):
super(Trajectory, self).__init__(location)
self.frame = frame
self.length = length
self.engine = engine
[docs] def clone(self):
return Trajectory(self.location, self.frame, self.length, self.engine)
def __len__(self):
return self.length
def __getitem__(self, item):
if 0 <= item <= len(self):
return Frame(self, item)
else:
return None
def __repr__(self):
return "Trajectory(%r >> %s[0..%d])" % (
self.frame, self.basename, self.length)
[docs] def pick(self):
"""
Return a random frame from all possible full frames
Returns
-------
`Frame`
the frame you can restart from
"""
# only use existing frames (strides!)
frames = self.existing_frames
idx = random.randint(0, len(frames) - 1)
return self[frames[idx]]
@property
def is_folder(self):
# we treat trajectories from now on as Directories
return True
[docs] def file(self, f):
"""
Return a file location to a file inside the trajectory folder
Parameters
----------
f : str or `OutputTypeDescription`
the filename to be appended to the trajectories directory
Returns
-------
`File`
the object containing the location
"""
if isinstance(f, basestring):
return File(os.path.join(self.location, f))
elif isinstance(f, OutputTypeDescription):
return self.file(f.filename)
[docs] def run(self):
"""
Return a task to run this engine
Returns
-------
`Task`
the task object that can be submitted to the queue
"""
# todo: check that you can generate one trajectory object only once
# not just the task for it
if self.engine:
return self.engine.run(self)
else:
return None
[docs] def extend(self, length):
"""
Get a task to extend this trajectory if the engine is set
Parameters
----------
length : int or list of int
the length to extend by as a single int or a list of ints
Returns
-------
`Task`
the task object to extend the trajectory
"""
if self.engine:
if isinstance(length, int):
length = [length]
# make sure we have a list now
assert(isinstance(length, (tuple, list)))
x = self
for l in length:
x = x.engine.extend(x, l)
return x
else:
return None
[docs] def outputs(self, outtype):
"""
Get a location to the file containing the output by given name
Parameters
----------
outtype : str or `OutputTypeDescription`
the name of the outputtype as str or the full description object
Returns
-------
`File`
a file location that points to the concrete file that contains
the data for a particular output type
"""
if self.engine:
if isinstance(outtype, basestring):
if outtype in self.engine.types:
return self.file(self.engine.types[outtype])
elif isinstance(outtype, OutputTypeDescription):
return self.file(outtype)
return None
@property
def types(self):
"""
Return the OutputTypeDescriptions for this trajectory
Returns
-------
dict str: `OutputTypeDescription`
the output description dict of the engine
"""
if self.engine:
return self.engine.types
return None
@property
def existing_frames(self):
"""
Returns
-------
list of int
a sorted list of frame indices with full coordinates that can be
used for restart. relative to the engines timesteps
"""
full_strides = self.engine.full_strides
frames = set()
l = len(self) + 1
for stride in full_strides:
frames.update(range(0, l, stride))
return sorted(frames)
[docs]class Frame(StorableMixin):
"""
Represents a frame of a trajectory
Attributes
----------
trajectory : `Trajectory`
the origin trajectory
index : int
the frame index staring from zero
"""
[docs] def __init__(self, trajectory, index):
super(Frame, self).__init__()
self.trajectory = trajectory
self.index = index
def __repr__(self):
return 'Frame(%s[%d])' % (self.trajectory.short, self.index)
@property
def index_in_outputs(self):
"""
Return output type and effective frame index for this frame
Returns
-------
str
the name of the output type
int
the effective index within this trajectory obeying the trajectories
own stride
"""
absolute_idx = self.index
if absolute_idx > self.trajectory.length:
return None, None
if self.trajectory.types:
for key, desc in self.trajectory.types.iteritems():
stride = desc.stride
if desc.selection is None:
# full atoms
if absolute_idx % stride == 0:
# picked a frame that exists in this stride
return key, absolute_idx / stride
return None, None
@property
def exists(self):
"""
Returns
-------
bool
if True there is a concrete trajectory file with full
coordinates for this frame
"""
ty, idx = self.index_in_outputs
return ty is not None
[docs]class TrajectoryGenerationTask(Task):
"""
A task that will generate a trajectory
"""
_copy_attributes = Task._copy_attributes + [
'trajectory'
]
def _default_success(self, scheduler):
super(TrajectoryGenerationTask, self)._default_success(scheduler)
# # give the used engine the credit for making the trajectory
# for t in self.targets:
# if isinstance(t, Trajectory):
# t.engine = self.generator
def __init__(self, generator=None, trajectory=None):
super(TrajectoryGenerationTask, self).__init__(generator)
# set this engine to be run by this
self.trajectory = trajectory
if trajectory:
trajectory.engine = self.generator
[docs] def extend(self, length):
"""
Extend the trajectory that was generated by this task
Parameters
----------
length : int
the number of frames resp to native engine timesteps
Returns
-------
`Task`
a task to extend the current trajectory
"""
t = self.generator.extend(self.trajectory, length)
# this is not really necessary since we require internally that the
# source exists but this will cause all dependencies to be
# submitted, too
t.dependencies = [self]
return t
[docs]class TrajectoryExtensionTask(TrajectoryGenerationTask):
"""
A task that generates a trajectory out of a source trajectory
"""
_copy_attributes = TrajectoryGenerationTask._copy_attributes + [
'source'
]
def __init__(self, generator=None, trajectory=None, source=None):
super(TrajectoryExtensionTask, self).__init__(generator, trajectory)
self.source = source
@property
def ready(self):
# an extension is ready to be executed, if the source also exists!
if not self.source.exists:
return False
# and dependencies need to be done
if not self.dependency_okay:
return False
return True
[docs]class OutputTypeDescription(StorableMixin):
"""
A description of a general trajectory type
Attributes
----------
filename : str
a filename to store these type of trajectory in
stride : int
the stride to be used relative to native engine timesteps
selection : str
a :meth:`mdtraj.Topolopgy.select` like selection of an atom subset
"""
def __init__(self, filename=None, stride=1, selection=None):
super(OutputTypeDescription, self).__init__()
if filename is None:
filename = 'stride-%d.dcd' % stride
self.filename = filename
self.stride = stride
self.selection = selection