AdaptiveMD¶
Documentation¶
installation¶
adaptiveMD¶
A Python framework to run adaptive Markov state model (MSM) simulation on HPC resources
The generation of MSMs requires a huge amount of trajectory data to be analyzed. In most cases this leads to an enhanced understanding of the dynamics of the system which can be used to make decision about collection more data to achieve a desired accuracy or level of detail in the generated MSM. This alternating process between simulation/actively generating new observations and analysis is currently difficult and involves lots of human decision along the path.
This framework aim to automate this process with the following goals:
- Ease of use: Simple system setup once an HPC has been added.
- Flexibility: Modular setup, attach to multiple HPCs and different simulation engines
- Automatism: Create an user-defined adaptive strategy that is executed
- Compatibility: Build analysis tools and export to known formats
Prerequisites¶
There are a few things we need to install to make this work.
MongoDB¶
AdaptiveMD and RP both need access to a MongoDB. The FU has one that Allegro can access in place and you can use this for storing projects. If you want to store these locally you need to install MongoDB.
Just download your OS installer from MongoDB Community Edition and follow the installation instructions. This is very straight forward and should work without any problems. You only need to install MongoDB on your local machine from which you will connect to the cluster. No need to install it on the cluster.
curl -O https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-debian81-3.4.2.tgz
tar -zxvf mongodb-linux-x86_64-debian81-3.4.2.tgz
mkdir -p ~/mongodb
cp -R -n mongodb-linux-x86_64-debian81-3.4.2/ ~/mongodb
# add PATH to .bashrc
echo "export PATH=~/mongodb/bin:$PATH" >> ~/.bash_rc
# create directory for storage (everywhere you have space)
mkdir -p ~/mongodb/data/db
# run the deamon in the background
mongod --quiet --dbpath ~/mongodb/data/db &
Conda¶
Whereever you will run the actual tasks (local or a cluster) you probably use some python so we recommend to install the common set of conda packages. If you are remotely executing python then you can even use python 3 without problems. The RPC might also work with python 3 but that needs to be tested.
If you have not yet installed conda please do so using
# curl -O https://repo.continuum.io/miniconda/Miniconda2-latest-Linux-x86_64.sh
# bash Miniconda2-latest-Linux-x86_64.sh
or in analogy for python3
Add 2 useful channels
conda config --append channels conda-forge
conda config --append channels omnia
and --append
will make sure that the regular conda packages are tried first and use conda-forge
and omnia
as a fallback.
Install required and necessary packages now
# for adaptivemd only
conda install ujson pyyaml pymongo=2.8 numpy
# for openmm, pyemma etc
conda install pyemma openmm mdtraj
Install adaptiveMD¶
Let’s get adaptivemd from the github repo now.
# clone and install adaptivemd
git clone git@github.com:markovmodel/adaptivemd.git
# go to adativemd
cd adaptivemd/
# and install it
python setup.py develop
# see if it works
python -c "import adaptivemd" || echo 'FAILED'
# run the mongodb server if not running already
mongod --dbpath={path_to_your_db_folder}
# run a simple test
cd adaptive/tests/
python test_simple.py
All of this must also be installed on the cluster, where you want to run your simulations.
For allegro I suggest to use a miniconda installation. Note that you only need these packages if you want to use some of it on the cluster like run openmm or make computations using pyemma. Just for running, say acemd
, conda is not required!
That’s it. Have fun running adaptive simulations.
To compile the doc pages, clone this github repository, go into the docs
folder and do
conda install sphinx sphinx_rtd_theme pandoc
make html
The HTML pages are in _build/html. Please note that the docs can only be compiled if all the above mentionend AdaptiveMD dependencies are available. If you are using conda environments, this means that your AdaptiveMD environment should be active.
You might want to start with the examples in examples/tutorials
Examples Notebooks¶
Example 1 - Setup¶
First we cover some basics about adaptive sampling to get you going.
We will briefly talk about
- resources
- files
- generators
- how to run a simple trajectory
Imports¶
In [1]:
import sys, os
Alright, let’s load the package and pick the Project
since we want
to start a project
In [2]:
from adaptivemd import Project
Let’s open a project with a UNIQUE name. This will be the name used in the DB so make sure it is new and not too short. Opening a project will always create a non-existing project and reopen an exising one. You cannot chose between opening types as you would with a file. This is a precaution to not accidentally delete your project.
In [3]:
# Use this to completely remove the example-worker project from the database.
Project.delete('tutorial')
In [4]:
project = Project('tutorial')
Now we have a handle for our project. First thing is to set it up to work on a resource.
The Resource
¶
What is a resource?¶
A Resource
specifies a shared filesystem with one or more clusteres
attached to it. This can be your local machine or just a regular cluster
or even a group of cluster that can access the same FS (like Titan, Eos
and Rhea do).
Once you have chosen your place to store your results it is set for the project and can (at least should) not be altered since all file references are made to match this resource.
Let us pick a local resource on your laptop or desktop machine for now. No cluster / HPC involved for now.
In [5]:
from adaptivemd import LocalResource
We now create the Resource object
In [6]:
resource = LocalResource()
Since this object defines the path where all files will be placed, let’s get the path to the shared folder. The one that can be accessed from all workers. On your local machine this is trivially the case.
In [7]:
resource.shared_path
Out[7]:
'$HOME/adaptivemd/'
Okay, files will be placed in $HOME/adaptivemd/
. You can change this
using an option when creating the Resource
LocalCluster(shared_path='$HOME/my/adaptive/folder/')
If you are interested in more information about Resource
setup
consult the documentation about Resource
Last, we save our configured Resource
and initialize our empty
prohect with it. This is done once for a project and should not be
altered.
In [8]:
project.initialize(resource)
Files¶
In [9]:
from adaptivemd import File, Directory
First we define a File
object. Instead of just a string, these are
used to represent files anywhere, on the cluster or your local
application. There are some subclasses or extensions of File
that
have additional meta information like Trajectory
or Frame
. The
underlying base object of a File
is called a Location
.
We start with a first PDB file that is located on this machine at a relative path
In [10]:
pdb_file = File('file://../files/alanine/alanine.pdb')
File
like any complex object in adaptivemd can have a .name
attribute that makes them easier to find later. You can either set the
.name
property after creation, or use a little helper method
.named()
to get a one-liner. This function will set .name
and
return itself.
For more information about the possibilities to specify filelocation
consult the documentation for File
In [11]:
pdb_file.name = 'initial_pdb'
The .load()
at the end is important. It causes the File
object
to load the content of the file and if you save the File
object, the
actual file is stored with it. This way it can simply be rewritten on
the cluster or anywhere else.
In [12]:
pdb_file.load()
Out[12]:
'alanine.pdb'
Generators¶
TaskGenerators are instances whose purpose is to create tasks to be
executed. This is similar to the way Kernels work. A TaskGenerator will
generate Task
objects for you which will be translated into a
ComputeUnitDescription
and executed. In simple terms:
The task generator creates the bash scripts for you that run a simulation or run pyemma.
A task generator will be initialized with all parameters needed to make it work and it will now what needs to be staged to be used.
In [13]:
from adaptivemd.engine.openmm import OpenMMEngine
A task generator that will create jobs to run simulations. Currently it uses a little python script that will excute OpenMM. It requires conda to be added to the PATH variable or at least openmm to be installed on the cluster. If you setup your resource correctly then this should all happen automatically.
So let’s do an example for an OpenMM engine. This is simply a small python script that makes OpenMM look like a executable. It run a simulation by providing an initial frame, OpenMM specific system.xml and integrator.xml files and some additional parameters like the platform name, how often to store simulation frames, etc.
In [14]:
engine = OpenMMEngine(
pdb_file=pdb_file,
system_file=File('file://../files/alanine/system.xml').load(),
integrator_file=File('file://../files/alanine/integrator.xml').load(),
args='-r --report-interval 1 -p CPU'
).named('openmm')
We have now an OpenMMEngine which uses the previously made pdb File
object and uses the location defined in there. The same for the OpenMM
XML files and some args to run using the CPU
kernel, etc.
Last we name the engine openmm
to find it later.
In [15]:
engine.name
Out[15]:
'openmm'
Next, we need to set the output types we want the engine to generate. We
chose a stride of 10 for the master
trajectory without selection and
a second trajectory with only protein atoms and native stride.
Note that the stride and all frame number ALWAYS refer to the native
steps used in the engine. In out example the engine uses 2fs
time
steps. So master stores every 20fs
and protein every 2fs
In [16]:
engine.add_output_type('master', 'master.dcd', stride=10)
engine.add_output_type('protein', 'protein.dcd', stride=1, selection='protein')
In [17]:
from adaptivemd.analysis.pyemma import PyEMMAAnalysis
The instance to compute an MSM model of existing trajectories that you
pass it. It is initialized with a .pdb
file that is used to create
features between the \(c_\alpha\) atoms. This implementaton requires
a PDB but in general this is not necessay. It is specific to my
PyEMMAAnalysis show case.
In [18]:
modeller = PyEMMAAnalysis(
engine=engine,
outtype='protein',
features={'add_inverse_distances': {'select_Backbone': None}}
).named('pyemma')
Again we name it pyemma
for later reference.
The other two option chose which output type from the engine we want to analyse. We chose the protein trajectories since these are faster to load and have better time resolution.
The features dict expresses which features to use. In our case use all inverse distances between backbone c_alpha atoms.
Next step is to add these to the project for later usage. We pick the
.generators
store and just add it. Consider a store to work like a
set()
in python. It contains objects only once and is not ordered.
Therefore we need a name to find the objects later. Of course you can
always iterate over all objects, but the order is not given.
To be precise there is an order in the time of creation of the object, but it is only accurate to seconds and it really is the time it was created and not stored.
In [19]:
project.generators.add(engine)
project.generators.add(modeller)
Note, that you cannot add the same engine twice. But if you create a new engine it will be considered different and hence you can store it again.
Create one initial trajectory¶
Finally we are ready to run a first trajectory that we will store as a point of reference in the project. Also it is nice to see how it works in general.
We are using a Worker approach. This means simply that someone (in our case the user from inside a script or a notebook) creates a list of tasks to be done and some other instance (the worker) will actually do the work.
First we create the parameters for the engine to run the simulation.
Since it seemed appropriate we use a Trajectory
object (a special
File
with initial frame and length) as the input. You could of
course pass these things separately, but this way, we can actualy
reference the no yet existing trajectory and do stuff with it.
A Trajectory should have a unique name and so there is a project function to get you one. It uses numbers and makes sure that this number has not been used yet in the project.
In [20]:
trajectory = project.new_trajectory(engine['pdb_file'], 100, engine)
trajectory
Out[20]:
Trajectory('alanine.pdb' >> [0..100])
This says, initial is alanine.pdb
run for 100 frames and is named
xxxxxxxx.dcd
.
You might wonder why a Trajectory
object is necessary. You could
just build a function that will take these parameters and run a
simulation. At the end it will return the trajectory object. The same
object we created just now.
The main reason is to familiarize you with the general concept of asyncronous execution and so-called Promises. The trajectory object we built is similar to a Promise so what is that exactly?
A Promise is a value (or an object) that represents the result of a
function at some point in the future. In our case it represents a
trajectory at some point in the future. Normal promises have specific
functions do deal with the unknown result, for us this is a little
different but the general concept stands. We create an object that
represents the specifications of a Trajectory
and so, regardless of
the existence, we can use the trajectory as if it would exists:
Get the length
In [21]:
print trajectory.length
100
and since the length is fixed, we know how many frames there are and can access them
In [22]:
print trajectory[20]
Frame(sandbox:///{}/00000000/[20])
ask for a way to extend the trajectory
In [23]:
print trajectory.extend(100)
<adaptivemd.engine.engine.TrajectoryExtensionTask object at 0x112c757d0>
ask for a way to run the trajectory
In [24]:
print trajectory.run()
<adaptivemd.engine.engine.TrajectoryGenerationTask object at 0x112c75450>
We can ask to extend it, we can save it. We can reference specific frames in it before running a simulation. You could even build a whole set of related simulations this way without running a single frame. You might understand that this is pretty powerful especially in the context of running asynchronous simulations.
Last, we did not answer why we have two separate steps: Create the trajectory first and then a task from it. The main reason is educational: > It needs to be clear that a ``Trajectory`` *can exist* before running some engine or creating a task for it. The ``Trajectory`` *is not* a result of a simulation action.
Now, we want that this trajectory actually exists so we have to make it.
This requires a Task
object that knows to describe a simulation.
Since Task
objects are very flexible and can be complex there are
helper functions (i.e. factories) to get these in an easy manner, like
the ones we already created just before. Let’s use the openmm engine to
create an openmm task now.
In [57]:
task = engine.run(trajectory)
As an alternative you can directly use the trajectory (which knows its
engine) and call .run()
In [58]:
task = trajectory.run()
That’s it, just take a trajectory description and turn it into a task that contains the shell commands and needed files, etc.
Finally we need to add this task to the things we want to be done. This
is easy and only requires saving the task to the project. This is done
to the project.tasks
bundle and once it has been stored it can be
picked up by any worker to execute it.
In [32]:
project.queue(task) # shortcut for project.tasks.add(task)
That is all we can do from here. To execute the tasks you need to run a worker using
adaptivemdworker -l tutorial --verbose
Once this is done, come back here and check your results. If you want you can execute the next cell which will block until the task has been completed.
In [33]:
print project.files
print project.trajectories
<StoredBundle for with 6 file(s) @ 0x111fa1150>
<ViewBundle for with 0 file(s) @ 0x111fa1450>
and close the project.
In [27]:
project.close()
The final project.close() will close the DB connection.
Example 2 - Run¶
Example 2 - The Tasks¶
Imports¶
In [1]:
import sys, os
In [2]:
from adaptivemd import Project, ExecutionPlan, Trajectory
Let’s open our test
project by its name. If you completed the
previous example this should all work out of the box.
In [3]:
project = Project('tutorial')
Open all connections to the MongoDB
and Session
so we can get
started.
Let’s see where we are. These numbers will depend on whether you run this notebook for the first time or just continue again. Unless you delete your project it will accumulate models and files over time, as is our ultimate goal.
In [4]:
print project.tasks
print project.trajectories
print project.models
<StoredBundle for with 2 file(s) @ 0x113a29a90>
<ViewBundle for with 1 file(s) @ 0x113a29bd0>
<StoredBundle for with 0 file(s) @ 0x113a299d0>
Now restore our old ways to generate tasks by loading the previously used generators.
In [5]:
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']
Remember that we stored some files in the database and of course you can look at them again, should that be important.
In [6]:
print pdb_file.get_file()[:1000] + ' [...]'
REMARK 1 CREATED WITH MDTraj 1.8.0, 2016-12-22
CRYST1 26.063 26.063 26.063 90.00 90.00 90.00 P 1 1
MODEL 0
ATOM 1 H1 ACE A 1 -1.900 1.555 26.235 1.00 0.00 H
ATOM 2 CH3 ACE A 1 -1.101 2.011 25.651 1.00 0.00 C
ATOM 3 H2 ACE A 1 -0.850 2.954 26.137 1.00 0.00 H
ATOM 4 H3 ACE A 1 -1.365 2.132 24.600 1.00 0.00 H
ATOM 5 C ACE A 1 0.182 1.186 25.767 1.00 0.00 C
ATOM 6 O ACE A 1 1.089 1.407 26.645 1.00 0.00 O
ATOM 7 N ALA A 2 0.302 0.256 24.807 1.00 0.00 N
ATOM 8 H ALA A 2 -0.588 0.102 24.354 1.00 0.00 H
ATOM 9 CA ALA A 2 1.498 -0.651 24.567 1.00 0.00 C
ATOM 10 HA ALA A 2 1.810 -0.944 25.570 1.00 0.00 H
ATOM 11 CB ALA A 2 1.054 -1.959 23.852 [...]
The Trajectory
object¶
Before we talk about adaptivity, let’s have a look at possibilities to generate trajectories.
We assume that you successfully ran a first trajectory using a worker. Next, we talk about lot’s of ways to generate new trajectories.
You will do this in the beginning. Remember we already have a PDB stored from setting up the engine. if you want to start from this configuration do as before
- create the
Trajectory
object you want - make a task
- submit the task to craft the object into existance on the HPC
A trajectory contains all necessary information to make itself. It has
- a (hopefully unique) location: This will we the folder where all the files that belong to the trajectory go.
- an initial frame: the initial configuration to be used to tell the MD simulation package where to start
- a length in frames to run
- the
Engine
: the actual engine I want to use to create the trajectory.
Note, the Engine
is technically not required unless you want to use
.run()
but it makes sense, because the engine contains information
about the topology and, more importantly information about which output
files are generated. This is the essential information you will need for
analysis, e.g. what is the filename of the trajectory file that contains
the protein structure and what is its stride?
Let’s first build a Trajectory
from scratch
In [7]:
file_name = next(project.traj_name) # get a unique new filename
trajectory = Trajectory(
location=file_name, # this creates a new filename
frame=pdb_file, # initial frame is the PDB
length=100, # length is 100 frames
engine=engine # the engine to be used
)
Since this is tedious to write there is a shortcut
In [8]:
trajectory = project.new_trajectory(
frame=pdb_file,
length=100,
engine=engine,
number=1 # if more then one you get a list of trajectories
)
Like in the first example, now that we have the parameters of the
Trajectory
we can create the task to do that.
The Task
object¶
First, an example
In [9]:
task_run = engine.run(trajectory)
This was easy, but we can do some interesting stuff. Since we know the trajectory will exist now we can also extend by some frames. Remember, the trajectory does not really exist yet (not until we submit it and a worker executes it), but we can pretend that it does, since it’s relevant propertier are set.
In [10]:
task_extend = engine.extend(trajectory, 50)
The only problem is to make sure the tasks are run in the correct order. This would not be a problem if the worker will run tasks in the order they are place in the queue, but that defeats the purpose of parallel runs. Therefore an extended tasks knows that is depends on the existance of the source trajectory. The worker will hence only run a trajectory, once the source exists.
We might wonder at this point how we manage to construct the dependency graph between all tasks and how this is handled and optimized, etc…
Well, we don’t. There is no dependency graph, at least not explicitely. All we do, is to check at a time among all task that should be run, which of there can be run. And this is easy to check, all dependent tasks need to be completed and must have succeeded. Then we can rely on their (the dependencies) results to exists and it makes sense to continue.
A real dependency graph would go even further and know about all future relations and you could identify bottleneck tasks which are necessary to allow other tasks to be run. We don’t do that (yet). It could improve performance in the sense that you will run at optimal load balance and keep all workers as busy as possible. Consider our a attempt a first order dependency graph.
In [11]:
project.queue(task_run, task_extend)
Remember that we allow an engine to output multiple trajectory types with freely chosen strides? This could leave to trouble. Imagine this (unrealistic) situation:
We have 1. full trajectory with stride=10
2. a reduced protein-only
trajectory with stride=7
Now run a trajectory of length=300
. We get
- 30+1 full (+1 for the initial frame) and
- 42+1 protein frames
That per se is no problem, but if you want to extend we only have a restart file for the very last frame and while this works for the full trajectory, for the protein trajectory you are 6 frames short. Just continuing and concatenating the two leads to a gap of 6+7=13 frames instead of 7. A small big potentially significant source of error.
So, compute the least common multiple of all strides using
In [12]:
engine.native_stride
Out[12]:
10
There is also a shorter way of writing this
In [13]:
# task = trajectory.run().extend(50)
This will create two tasks that first runs the trajectory and then extend it by 50 frames (in native engine frames)
If you want to do that several times, you can pass a list of ints which
is a shortcut for calling .extend(l1).extend(l2). ...
In [14]:
# task = trajectory.run().extend([10] * 10)
This will create 10! tasks that eacht will extend the previous one. Each of the task requires the previous one to finish, this way the dependency is preserved. You can use this to mimick using several restarts in between and it also means that you have no idea which worker will actually start and which worker will continue or finish a trajectory.
Checking the results¶
For a seconds let’s see if everything went fine.
In [27]:
for t in project.trajectories:
print t.short, t.length
sandbox:///{}/00000000/ 100
sandbox:///{}/00000002/ 150
If this works, then you should see one 100 frame trajectory from the setup (first example) and a second 150 length trajectory that we just generated by running 100 frames and extending it by another 50.
If not, there might be a problem or (more likely) the tasks are not finished yet. Just try the above cell again and see if it changes to the expected output.
project.trajectories
will show you only existing trajectories. Not
ones, that are planned or have been extended. If you want to see all the
ones already in the project, you can look at project.files
. Which is
a bundle and bundles can be filtered. But first all files
In [28]:
for f in project.files:
print f
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/adaptivemd/scripts/_run_.py
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/adaptivemd/engine/openmm/openmmrun.py
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/files/alanine/alanine.pdb
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/files/alanine/system.xml
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/files/alanine/integrator.xml
sandbox:///projects/tutorial/trajs/00000000/
sandbox:///projects/tutorial/trajs/00000002/
sandbox:///projects/tutorial/trajs/00000002/
Now all files filtered by [c]lass Trajectory
. DT
is a little
helper to convert time stamps into something readable.
In [29]:
from adaptivemd import DT
In [30]:
for t in project.files.c(Trajectory):
print t.short, t.length,
if t.created:
if t.created > 0:
print 'created @ %s' % DT(t.created)
else:
print 'modified @ %s' % DT(-t.created)
else:
print 'not existent'
sandbox:///{}/00000000/ 100 created @ 2017-05-26 22:45:00
sandbox:///{}/00000002/ 100 modified @ 2017-05-26 22:46:14
sandbox:///{}/00000002/ 150 created @ 2017-05-26 22:46:14
You see, that the extended trajecory appears twice once with length 100
and once with length 150. This is correct, because at the idea of a 100
frame trajectory was used and hence is saved. But why does this one not
appear in the list of trajectories. It was created first and had a
timestamp of creation written to .created
. This is the time when the
worker finishes and was successful.
At the same time, all files that are overwritten, are marked as modified by setting a negative timestamp. So if
.created is None
, the file does not exist nor has it..created > 0
, the file exists.created < 0
, the file existed but has been overwritten
Finally, all project.trajectories
are files of type Trajectory
with positive created
index.
Dealing with errors¶
Let’s do something stupid and produce an error by using a wrong initial pdb file.
In [31]:
trajectory = project.new_trajectory(engine['system_file'], 100)
task = engine.run(trajectory)
project.queue(task)
Well, nothing changed obviously and we expect it to fail. So let’s inspect what happened.
In [35]:
task.state
Out[35]:
u'fail'
You might need to execute this cell several times. It will first become
queued
, then running
and finally fail
and stop there.
It failed, well, we kind of knew that. No suprise here, but why? Let’s look at the stdout and stderr
In [36]:
print task.stdout
22:46:34 [worker:3] stdout from running task
GO...
Reading PDB
In [37]:
print task.stderr
22:46:34 [worker:3] stderr from running task
Traceback (most recent call last):
File "openmmrun.py", line 169, in <module>
pdb = PDBFile(args.topology_pdb)
File "/Users/jan-hendrikprinz/anaconda/lib/python2.7/site-packages/simtk/openmm/app/pdbfile.py", line 159, in __init__
self.positions = self._positions[0]
IndexError: list index out of range
We see, what we expect. In openmmrun.py
the openmm executable it
could not load the pdb file.
NOTE If your worker dies for some reason, it will not set a STDOUT or STDERR. If you think that your task should be able to execute, then you can dotask.state = 'created'
and reset it to be accessible to workers. This is NOT recommended, just to explain how this works. Of course you need a new worker anyway.
What else¶
If you have a Trajectory
object and create the real trajectory file,
you can also put the Trajectory
directly into the queue. This is
equivalent to call .run
on the trajectory and submit the resulting
Task
to the queue. The only downside is that you do not see the task
object and cannot directly work with it, check it’s status, etc…
In [38]:
# project.queue(project.new_trajectory(pdb_file, 100, engine).run()) can be called as
project.queue(project.new_trajectory(pdb_file, 100, engine))
Trajectories from other trajectories¶
This will be the most common case. At least in any remote kind of adaptivity you will not start always from the same position or extend. You want to pick any exisiting frame and continue from there. So, let’s do that.
First we get a trajectory. Every Bundle
in the project (e.g.
.trajectories
, .models
, .files
, .tasks
) acts like an
enhanced set. You can iterate over all entries as we did before, and you
can get one element, which usually is the first stored, but not always.
If you are interested in Bundle
s see the documentation. For now
that is enough to know, that a bundle (as a set) has a .one
function
which is short for getting the first object if you iterate. As if you
would call next(project.trajectories)
. Note, that the iterator does
not ensure a fixed order. You literally might get any object, if there
is at least one.
In [39]:
trajectory = project.trajectories.one
Good, at least 100 frames. We pick, say, frame at index 28 (which is the
29th frame, we start counting at zero) using the way you pick an element
from a python list (which is almost what a Trajectory
represents, a
list of frames)
In [40]:
frame = trajectory[28]
print frame, frame.exists
Frame(sandbox:///{}/00000000/[28]) False
In [41]:
frame = trajectory[30]
print frame, frame.exists
Frame(sandbox:///{}/00000000/[30]) True
This part is important! We are running only one full atom trajectory with stride larger than one, so if we want to pick a frame from this trajectory you can pick in theory every frame, but only some of these really exist. If you want to restart from a frame this needs to be the case. Otherwise you run into trouble.
To run a trajectory just use the frame as the initial frame.
In [42]:
frame = trajectory[28]
task = project.new_trajectory(frame, 100, engine).run()
print task
None
In [43]:
frame = trajectory[30]
task = project.new_trajectory(frame, 100, engine).run()
print task
<adaptivemd.engine.engine.TrajectoryGenerationTask object at 0x113a6f1d0>
In [44]:
print task.description
Task: TrajectoryGenerationTask(OpenMMEngine) [created]
Sources
- staging:///integrator.xml
- staging:///alanine.pdb
- staging:///openmmrun.py
- sandbox:///{}/00000000/ [exists]
- staging:///system.xml
Targets
- sandbox:///{}/00000006/
Modified
<pretask>
Link('staging:///alanine.pdb' > 'worker://initial.pdb)
Link('staging:///system.xml' > 'worker://system.xml)
Link('staging:///integrator.xml' > 'worker://integrator.xml)
Link('staging:///openmmrun.py' > 'worker://openmmrun.py)
Link('sandbox:///{}/00000000/' > 'worker://source/)
mdconvert -o worker://input.pdb -i 3 -t worker://initial.pdb worker://source/master.dcd
Touch('worker://traj/')
python openmmrun.py -r --report-interval 1 -p CPU --types="{'protein':{'stride':1,'selection':'protein','filename':'protein.dcd'},'master':{'stride':10,'selection':null,'filename':'master.dcd'}}" -t worker://input.pdb --length 100 worker://traj/
Move('worker://traj/' > 'sandbox:///{}/00000006/)
<posttask>
See, how the actual frame picked in the mdconvert
line is -i 3
meaning index 3 which represents frame 30 with stride 10.
Now, run the task.
In [45]:
project.queue(task)
Btw, you can wait until something happens using
project.wait_until(condition)
. This is not so useful in notebooks,
but in scripts it does. condition
here is a function that evaluates
to True
or False
. it will be tested in regular intervals and
once it is True
the function returns.
In [46]:
project.wait_until(task.is_done)
In [47]:
task.state
Out[47]:
u'success'
Each Task
has a function is_done
that you can use. It will
return once a task is done. That means it either failed or succeeded or
was cancelled. Basically when it is not queued anymore.
If you want to run adaptively, all you need to do is to figure out where to start new simulations from and use the methods provided to run these.
Model
tasks¶
There are of course other things you can do besides creating new trajectories
In [48]:
from adaptivemd.analysis.pyemma import PyEMMAAnalysis
The instance to compute an MSM model of existing trajectories that you
pass it. It is initialized with a .pdb
file that is used to create
features between the \(c_\alpha\) atoms. This implementaton requires
a PDB but in general this is not necessay. It is specific to my
PyEMMAAnalysis show case.
In [49]:
modeller = PyEMMAAnalysis(
engine=engine,
outtype='protein',
features={'add_inverse_distances': {'select_Backbone': None}}
).named('pyemma')
Again we name it pyemma
for later reference.
The other two option chose which output type from the engine we want to analyse. We chose the protein trajectories since these are faster to load and have better time resolution.
The features dict expresses which features to use. In our case use all inverse distances between backbone c_alpha atoms.
A model generating task work similar to trajectories. You create the
generator with options (so far, this will become more complex in the
future) and then you create a Task
from passing it a list of
trajectories to be analyzed.
In [50]:
task = modeller.execute(list(project.trajectories))
project.queue(task)
In [51]:
project.wait_until(task.is_done)
In [52]:
for m in project.models:
print m
<adaptivemd.model.Model object at 0x113a62350>
So we generated one model. The Model
objects contain (in the base
version) only a .data
attribute which is a dictionary of information
about the generated model.
In [53]:
model = project.models.last
In [54]:
print model['msm']['P']
[[ 0.92105263 0.0122409 0. 0.06670647 0. ]
[ 0.00981369 0.90825688 0. 0.03976326 0.04216617]
[ 0. 0. 0.92405064 0.05817864 0.01777072]
[ 0.05301407 0.03941725 0.01509557 0.89247312 0. ]
[ 0. 0.06071783 0.0066979 0. 0.93258427]]
Pick frames automatically¶
The last thing that is implemented is a function that can utilize models to decide which frames are better to start from. The simplest one will use the counts per state, take the inverse and use this as a distribution.
In [55]:
project.find_ml_next_frame(4)
Out[55]:
[Frame(sandbox:///{}/00000004/[50]),
Frame(sandbox:///{}/00000002/[30]),
Frame(sandbox:///{}/00000002/[130]),
Frame(sandbox:///{}/00000002/[90])]
So you can pick states according to the newest (last) model. (This will be moved to the Brain). And since we want trajectories with these frames as starting points there is also a function for that
In [56]:
trajectories = project.new_ml_trajectory(length=100, number=4, engine=engine)
trajectories
Out[56]:
[Trajectory(Frame(sandbox:///{}/00000004/[50]) >> [0..100]),
Trajectory(Frame(sandbox:///{}/00000002/[30]) >> [0..100]),
Trajectory(Frame(sandbox:///{}/00000002/[20]) >> [0..100]),
Trajectory(Frame(sandbox:///{}/00000006/[80]) >> [0..100])]
Let’s submit these before we finish this notebook with a quick discussion of workers
In [57]:
project.queue(trajectories)
That’s it.
The Worker
objects¶
Worker are the instances that execute tasks for you. If you did not stop the worker in the command line it will still be running and you can check its state
In [58]:
project.trigger()
for w in project.workers:
if w.state == 'running':
print '[%s:%s] %s:%s' % (w.state, DT(w.seen).time, w.hostname, w.cwd)
[running:22:47:26] Stevie.fritz.box:/Users/jan-hendrikprinz/Studium/git/adaptivemd
Okay, the worker is running, was last reporting its heartbeat at … and
has a hostname and current working directory (where it was executed
from). The generators specify which tasks from some generators are
executed. If it is None
then the worker runs all tasks it finds. You
can use this to run specific workers for models and some for trajectory
generation.
You can also control it remotely by sending it a command. shutdown
will shut it down for you.
In [59]:
# project.workers.last.command = 'shutdown'
Afterwards you need to restart you worker to continue with this examples.
If you want to control Worker
objects look at the documentation.
In [60]:
project.close()
Example 3 - Adaptive¶
AdaptiveMD¶
Example 3 - Running an adaptive loop¶
In [1]:
import sys, os
In [2]:
from adaptivemd import (
Project,
File
)
# We need this to be part of the imports. You can only restore known objects
# Once these are imported you can load these objects.
from adaptivemd.engine.openmm import OpenMMEngine
from adaptivemd.analysis.pyemma import PyEMMAAnalysis
Let’s open our test
project by its name. If you completed the first
examples this should all work out of the box.
In [3]:
project = Project('tutorial')
Open all connections to the MongoDB
and Session
so we can get
started.
An interesting thing to note here is, that since we use a DB in the back, data is synced between notebooks. If you want to see how this works, just run some tasks in the last example, go back here and check on the change of the contents of the project.
Let’s see where we are. These numbers will depend on whether you run this notebook for the first time or just continue again. Unless you delete your project it will accumulate models and files over time, as is our ultimate goal.
In [4]:
print project.files
print project.generators
print project.models
<StoredBundle for with 19 file(s) @ 0x111db0a50>
<StoredBundle for with 3 file(s) @ 0x111db0a10>
<StoredBundle for with 1 file(s) @ 0x111db09d0>
Now restore our old ways to generate tasks by loading the previously used generators.
In [5]:
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']
Run simulations¶
You are free to conduct your simulations from a notebook but normally you will use a script. The main point about adaptivity is to make decision about tasks along the way.
To make this happen we need Conditions
which are functions that
evaluate to True
or False
and once they are True
they cannot
change anymore back to False
. Like a one time on switch.
These are used to describe the happening of an event. We will now deal with some types of events.
We want to first look into a way to run python code asynchroneously in the project. For this, we write a function that should be executed. Inside you will create tasks and submit them.
If the function should pause, write yield {condition_to_continue}
.
This will interrupt your script until the function you return will
return True
when called. An example
In [6]:
def strategy(loops=10, trajs_per_loop=4, length=100):
for loop in range(loops):
# submit some trajectory tasks
trajectories = project.new_ml_trajectory(engine, length, trajs_per_loop)
tasks = map(engine.run, trajectories)
project.queue(tasks)
# continue if ALL of the tasks are done (can be failed)
yield [task.is_done for task in tasks]
# submit a model job
task = modeller.execute(list(project.trajectories))
project.queue(task)
# when it is done do next loop
yield task.is_done
and add the event to the project (these cannot be stored yet!)
In [7]:
project.add_event(strategy(loops=2))
Out[7]:
<adaptivemd.plan.ExecutionPlan at 0x111deaad0>
What is missing now? The adding of the event triggered the first part of the code. But to recheck if we should continue needs to be done manually.
RP has threads in the background and these can call the trigger whenever something changed or finished.
Still that is no problem, we can do that easily and watch what is happening
Let’s see how our project is growing. TODO: Add threading.Timer to auto trigger.
In [8]:
import time
from IPython.display import clear_output
In [11]:
try:
while project._events:
clear_output(wait=True)
print '# of files %8d : %s' % (len(project.trajectories), '#' * len(project.trajectories))
print '# of models %8d : %s' % (len(project.models), '#' * len(project.models))
sys.stdout.flush()
time.sleep(2)
project.trigger()
except KeyboardInterrupt:
pass
# of files 24 : ########################
# of models 4 : ####
Let’s do another round with more loops
In [10]:
project.add_event(strategy(loops=2))
Out[10]:
<adaptivemd.plan.ExecutionPlan at 0x111dfc750>
And some analysis (might have better functions for that)
In [12]:
# find, which frames from which trajectories have been chosen
trajs = project.trajectories
q = {}
ins = {}
for f in trajs:
source = f.frame if isinstance(f.frame, File) else f.frame.trajectory
ind = 0 if isinstance(f.frame, File) else f.frame.index
ins[source] = ins.get(source, []) + [ind]
for a,b in ins.iteritems():
print a.short, ':', b
file://{}/alanine.pdb : [0, 0, 0]
sandbox:///{}/00000009/ : [30]
sandbox:///{}/00000010/ : [0]
sandbox:///{}/00000006/ : [80, 90, 80]
sandbox:///{}/00000012/ : [70, 20]
sandbox:///{}/00000002/ : [30, 20, 0]
sandbox:///{}/00000013/ : [90, 20]
sandbox:///{}/00000018/ : [40, 40]
sandbox:///{}/00000022/ : [60]
sandbox:///{}/00000004/ : [50, 40, 40]
sandbox:///{}/00000000/ : [30]
sandbox:///{}/00000007/ : [90, 90]
And do this with multiple events in parallel.
In [13]:
def strategy2():
for loop in range(10):
num = len(project.trajectories)
task = modeller.execute(list(project.trajectories))
project.queue(task)
yield task.is_done
# continue only when there are at least 2 more trajectories
yield project.on_ntraj(num + 2)
In [14]:
project.add_event(strategy(loops=10, trajs_per_loop=2))
project.add_event(strategy2())
Out[14]:
<adaptivemd.plan.ExecutionPlan at 0x10bf7f310>
And now wait until all events are finished.
In [15]:
project.wait_until(project.events_done)
See, that we again reused our strategy.
In [16]:
project.close()
Example 4 - Tasks¶
AdaptiveMD¶
Example 4 - Custom Task
objects¶
In [1]:
import sys, os
In [2]:
from adaptivemd import (
Project, Task, File, PythonTask
)
Let’s open our test
project by its name. If you completed the first
examples this should all work out of the box.
In [3]:
project = Project('tutorial')
Open all connections to the MongoDB
and Session
so we can get
started.
Let’s see again where we are. These numbers will depend on whether you run this notebook for the first time or just continue again. Unless you delete your project it will accumulate models and files over time, as is our ultimate goal.
In [4]:
print project.files
print project.generators
print project.models
<StoredBundle for with 127 file(s) @ 0x112e3ba10>
<StoredBundle for with 3 file(s) @ 0x112e3b9d0>
<StoredBundle for with 25 file(s) @ 0x112e3b990>
Now restore our old ways to generate tasks by loading the previously used generators.
In [5]:
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']
A simple task¶
A task is in essence a bash script-like description of what should be executed by the worker. It has details about files to be linked to the working directory, bash commands to be executed and some meta information about what should happen in case we succeed or fail.
Let’s first explain briefly how a task is executed and what its components are. This was originally build so that it is compatible with radical.pilot and still is. So, if you are familiar with it, all of the following information should sould very familiar.
A task is executed from within a unique directory that only exists for
this particular task. These are located in adaptivemd/workers/
and
look like
worker.0x5dcccd05097611e7829b000000000072L/
the long number is a hex representation of the UUID of the task. Just if you are curious type
print hex(my_task.__uuid__)
Then we change directory to this folder write a running.sh
bash
script and execute it. This script is created from the task definition
and also depends on your resource setting (which basically only contain
the path to the workers directory, etc)
The script is divided into 1 or 3 parts depending on which Task
class you use. The main Task
uses a single list of commands, while
PrePostTask
has the following structure
- Pre-Exec: Things to happen before the main command (optional)
- Main: the main commands are executed
- Post-Exec: Things to happen after the main command (optional)
Okay, lots of theory, now some real code for running a task that generated a trajectory
In [8]:
task = engine.run(project.new_trajectory(pdb_file, 100))
In [9]:
task.script
Out[9]:
[Link('staging:///alanine.pdb' > 'worker://initial.pdb),
Link('staging:///system.xml' > 'worker://system.xml),
Link('staging:///integrator.xml' > 'worker://integrator.xml),
Link('staging:///openmmrun.py' > 'worker://openmmrun.py),
Touch('worker://traj/'),
'python openmmrun.py -r --report-interval 1 -p CPU --types="{\'protein\':{\'stride\':1,\'selection\':\'protein\',\'filename\':\'protein.dcd\'},\'master\':{\'stride\':10,\'selection\':null,\'filename\':\'master.dcd\'}}" -t worker://initial.pdb --length 100 worker://traj/',
Move('worker://traj/' > 'sandbox:///{}/00000047/)]
We are linking a lot of files to the worker directory and change the
name for the .pdb in the process. Then call the actual python
script
that runs openmm. And finally move the output.dcd
and the restart
file back tp the trajectory folder.
There is a way to list lot’s of things about tasks and we will use it a lot to see our modifications.
In [10]:
print task.description
Task: TrajectoryGenerationTask(OpenMMEngine) [created]
Sources
- staging:///integrator.xml
- staging:///alanine.pdb
- staging:///openmmrun.py
- staging:///system.xml
Targets
- sandbox:///{}/00000047/
Modified
<pretask>
Link('staging:///alanine.pdb' > 'worker://initial.pdb)
Link('staging:///system.xml' > 'worker://system.xml)
Link('staging:///integrator.xml' > 'worker://integrator.xml)
Link('staging:///openmmrun.py' > 'worker://openmmrun.py)
Touch('worker://traj/')
python openmmrun.py -r --report-interval 1 -p CPU --types="{'protein':{'stride':1,'selection':'protein','filename':'protein.dcd'},'master':{'stride':10,'selection':null,'filename':'master.dcd'}}" -t worker://initial.pdb --length 100 worker://traj/
Move('worker://traj/' > 'sandbox:///{}/00000047/)
<posttask>
As long as a task is not saved and hence placed in the queue, it can be altered in any way. All of the 3 / 5 phases can be changed separately. You can add things to the staging phases or bash phases or change the command. So, let’s do that now
First, a Task
is very similar to a list of bash commands and you can
simply append (or prepend) a command. A text line will be interpreted as
a bash command.
In [11]:
task.append('echo "This new line is pointless"')
In [12]:
print task.description
Task: TrajectoryGenerationTask(OpenMMEngine) [created]
Sources
- staging:///integrator.xml
- staging:///alanine.pdb
- staging:///openmmrun.py
- staging:///system.xml
Targets
- sandbox:///{}/00000047/
Modified
<pretask>
Link('staging:///alanine.pdb' > 'worker://initial.pdb)
Link('staging:///system.xml' > 'worker://system.xml)
Link('staging:///integrator.xml' > 'worker://integrator.xml)
Link('staging:///openmmrun.py' > 'worker://openmmrun.py)
Touch('worker://traj/')
python openmmrun.py -r --report-interval 1 -p CPU --types="{'protein':{'stride':1,'selection':'protein','filename':'protein.dcd'},'master':{'stride':10,'selection':null,'filename':'master.dcd'}}" -t worker://initial.pdb --length 100 worker://traj/
Move('worker://traj/' > 'sandbox:///{}/00000047/)
echo "This new line is pointless"
<posttask>
As expected this line was added to the end of the script.
To set staging is more difficult. The reason is, that you normally have no idea where files are located and hence writing a copy or move is impossible. This is why the staging commands are not bash lines but objects that hold information about the actual file transaction to be done. There are some task methods that help you move files but also files itself can generate this commands for you.
Let’s move one trajectory (directory) around a little more as an example
In [13]:
traj = project.trajectories.one
In [14]:
transaction = traj.copy()
print transaction
Copy('sandbox:///{}/00000002/' > 'worker://)
This looks like in the script. The default for a copy is to move a file
or folder to the worker directory under the same name, but you can give
it another name/location if you use that as an argument. Note that since
trajectories are a directory you need to give a directory name (which
end in a /
)
In [15]:
transaction = traj.copy('new_traj/')
print transaction
Copy('sandbox:///{}/00000002/' > 'worker://new_traj/)
If you want to move it not to the worker directory you have to specify
the location and you can do so with the prefixes (shared://
,
sandbox://
, staging://
as explained in the previous examples)
In [16]:
transaction = traj.copy('staging:///cached_trajs/')
print transaction
Copy('sandbox:///{}/00000002/' > 'staging:///cached_trajs/)
Besides .copy
you can also .move
or .link
files.
In [17]:
transaction = pdb_file.copy('staging:///delete.pdb')
print transaction
transaction = pdb_file.move('staging:///delete.pdb')
print transaction
transaction = pdb_file.link('staging:///delete.pdb')
print transaction
Copy('file://{}/alanine.pdb' > 'staging:///delete.pdb)
Move('file://{}/alanine.pdb' > 'staging:///delete.pdb)
Link('file://{}/alanine.pdb' > 'staging:///delete.pdb)
Let’s mention these because they require special treatment. We cannot (like RP can) copy files to the HPC, we need to store them in the DB first.
In [18]:
new_pdb = File('file://../files/ntl9/ntl9.pdb').load()
Make sure you use file://
to indicate that you are using a local
file. The above example uses a relative path which will be replaced by
an absolute one, otherwise we ran into trouble once we open the project
at a different directory.
In [19]:
print new_pdb.location
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/files/ntl9/ntl9.pdb
Note that now there are 3 /
in the filename, two from the ://
and one from the root directory of your machine
The load()
at the end really loads the file and when you save this
File
now it will contain the content of the file. You can access
this content as seen in the previous example.
In [20]:
print new_pdb.get_file()[:300]
CRYST1 50.000 50.000 50.000 90.00 90.00 90.00 P 1
ATOM 1 N MET 1 33.720 28.790 34.120 0.00 0.00 N
ATOM 2 H1 MET 1 33.620 29.790 33.900 0.00 0.00 H
ATOM 3 H2 MET 1 33.770 28.750 35.120 0.00 0.00
For local files you normally use .transfer
, but copy
, move
or link
work as well. Still, there is no difference since the file
only exists in the DB now and copying from the DB to a place on the HPC
results in a simple file creation.
Now, we want to add a command to the staging and see what happens.
In [21]:
transaction = new_pdb.transfer()
print transaction
Transfer('file://{}/ntl9.pdb' > 'worker://ntl9.pdb)
In [22]:
task.append(transaction)
In [23]:
print task.description
Task: TrajectoryGenerationTask(OpenMMEngine) [created]
Sources
- staging:///integrator.xml
- staging:///alanine.pdb
- file://{}/ntl9.pdb [exists]
- staging:///openmmrun.py
- staging:///system.xml
Targets
- sandbox:///{}/00000047/
Modified
<pretask>
Link('staging:///alanine.pdb' > 'worker://initial.pdb)
Link('staging:///system.xml' > 'worker://system.xml)
Link('staging:///integrator.xml' > 'worker://integrator.xml)
Link('staging:///openmmrun.py' > 'worker://openmmrun.py)
Touch('worker://traj/')
python openmmrun.py -r --report-interval 1 -p CPU --types="{'protein':{'stride':1,'selection':'protein','filename':'protein.dcd'},'master':{'stride':10,'selection':null,'filename':'master.dcd'}}" -t worker://initial.pdb --length 100 worker://traj/
Move('worker://traj/' > 'sandbox:///{}/00000047/)
echo "This new line is pointless"
Transfer('file://{}/ntl9.pdb' > 'worker://ntl9.pdb)
<posttask>
We now have one more transfer command. But something else has changed.
There is one more files listed as required. So, the task can only run,
if that file exists, but since we loaded it into the DB, it exists (for
us). For example the newly created trajectory 25.dcd
does not exist
yet. Would that be a requirement the task would fail. But let’s check
that it exists.
In [24]:
new_pdb.exists
Out[24]:
True
Okay, we have now the PDB file staged and so any real bash commands
could work with a file ntl9.pdb
. Alright, so let’s output its stats.
In [25]:
task.append('stat ntl9.pdb')
Note that usually you place these stage commands at the top or your script.
Now we could run this task, as before and see, if it works. (Make sure you still have a worker running)
In [26]:
project.queue(task)
And check, that the task is running
In [31]:
task.state
Out[31]:
u'success'
If we did not screw up the task, it should have succeeded and we can look at the STDOUT.
In [32]:
print task.stdout
23:00:30 [worker:3] stdout from running task
GO...
Reading PDB
Done
Initialize Simulation
Done.
('# platform used:', 'CPU')
('# temperature:', Quantity(value=300.0, unit=kelvin))
{"protein":{"stride":1,"selection":"protein","filename":"protein.dcd"},"master":{"stride":10,"selection":null,"filename":"master.dcd"}}
Writing stride 1 to file `protein.dcd` with selection `protein`
Writing stride 10 to file `master.dcd` with selection `None`
START SIMULATION
DONE
Written to directory `traj/`
This new line is pointless
16777220 101593151 -rw-r--r-- 1 jan-hendrikprinz staff 0 1142279 "May 26 23:00:28 2017" "May 26 23:00:26 2017" "May 26 23:00:26 2017" "May 26 23:00:26 2017" 4096 2232 0 ntl9.pdb
Well, great, we have the pointless output and the stats of the newly
staged file ntl9.pdb
Just for fun let’s create the same scheduler that the
adaptivemdworker
uses, but from inside this notebook.
In [33]:
from adaptivemd import WorkerScheduler
In [34]:
sc = WorkerScheduler(project.resource)
If you really wanted to use the worker you need to initialize it and it
will create directories and stage files for the generators, etc. For
that you need to call sc.enter(project)
, but since we only want it
to parse our tasks, we only set the project without invoking
initialization. You should normally not do that.
In [35]:
sc.project = project
Now we can use a function .task_to_script
that will parse a task
into a bash script. So this is really what would be run on your machine
now.
In [36]:
print '\n'.join(sc.task_to_script(task))
set -e
ln -s ../staging_area/alanine.pdb initial.pdb
ln -s ../staging_area/system.xml system.xml
ln -s ../staging_area/integrator.xml integrator.xml
ln -s ../staging_area/openmmrun.py openmmrun.py
mkdir -p traj/
python openmmrun.py -r --report-interval 1 -p CPU --types="{'protein':{'stride':1,'selection':'protein','filename':'protein.dcd'},'master':{'stride':10,'selection':null,'filename':'master.dcd'}}" -t initial.pdb --length 100 traj/
mkdir -p ../../projects/tutorial/trajs/00000047/
mv traj/* ../../projects/tutorial/trajs/00000047/
rm -r traj/
echo "This new line is pointless"
# write file `ntl9.pdb` from DB
stat ntl9.pdb
Now you see that all file paths have been properly interpreted to work. See that there is a comment about a temporary file from the DB that is then renamed. This is a little trick to be compatible with RPs way of handling files. (TODO: We might change this to just write to the target file. Need to check if that is still consistent)
One problem with bash scripts is that when you create the tasks you have no concept on where the files actually are located. To get around this the created bash script will be scanned for paths, that contain prefixed like we are used to and are interpreted in the context of the worker / scheduler. The worker is the only instance to know all that is necessary so this is the place to fix that problem.
Let’s see that in a little example, where we create an empty file in the staging area.
In [37]:
task = Task()
task.append('touch staging:///my_file.txt')
In [38]:
print '\n'.join(sc.task_to_script(task))
set -e
touch ../staging_area/my_file.txt
And voila, the path has changed to a relative path from the working directory of the worker. Note that you see here the line we added in the very beginning of example 1 to our resource!
If you want to start a new task you can begin with
In [39]:
task = Task()
as we did before.
Just start adding staging and bash commands and you are done. When you create a task you can assign it a generator, then the system will assume that this task was generated by that generator, so don’t do it for you custom tasks, unless you generated them in a generator. Setting this allows you to tell a worker only to run tasks of certain types.
The Python RPC Task¶
The tasks so far a very powerful, but they lack the possibility to call a python function. Since we are using python here, it would be great to really pretend to call a python function from here and not taking the detour of writing a python bash executable with arguments, etc… An example for this is the PyEmma generator which uses this capability.
Let’s do an example of this as well. Assume we have a python function in
a file (you need to have your code in a file so far so that we can copy
the file to the HPC if necessary). Let’s create the .py
file now.
In [40]:
%%file my_rpc_function.py
def my_func(f):
import os
print f
return os.path.getsize(f)
Overwriting my_rpc_function.py
Now create a PythonTask instead
In [41]:
task = PythonTask()
and the call function has changed. Note that also now you can still add
all the bash and stage commands as before. A PythonTask is also a
subclass of PrePostTask
so we have a .pre
and .post
phase
available.
In [42]:
from my_rpc_function import my_func
We call the function my_func
with one argument
In [43]:
task.call(my_func, f=project.trajectories.one)
In [44]:
print task.description
Task: PythonTask(NoneType) [created]
Sources
- file://{}/_rpc_input_0x3437ecd1425611e7ac1a000000000206L.json
- file://{}/my_rpc_function.py [exists]
- staging:///_run_.py
Targets
- file://{}/_rpc_output_0x3437ecd1425611e7ac1a000000000206L.json
Modified
<pretask>
Transfer('file://{}/_rpc_input_0x3437ecd1425611e7ac1a000000000206L.json' > 'worker://input.json)
Link('staging:///_run_.py' > 'worker://_run_.py)
Transfer('file://{}/my_rpc_function.py' > 'worker://my_rpc_function.py)
python _run_.py
Transfer('worker://output.json' > 'file://{}/_rpc_output_0x3437ecd1425611e7ac1a000000000206L.json)
<posttask>
Well, interesting. What this actually does is to write the input
arguments to the function into a temporary .json
file on the worker,
(in RP on the local machine and then transfers it to remote), rename it
to input.json
and read it in the _run_.py
. This is still a
little clumsy, but needs to be this way to be RP compatible which only
works with files! Look at the actual script.
You see, that we really copy the .py
file that contains the source
code to the worker directory. All that is done automatically. A little
caution on this. You can either write a function in a single file or use
any installed package, but in this case the same package needs to be
installed on the remote machine as well!
Let’s run it and see what happens.
In [45]:
project.queue(task)
And wait until the task is done
In [46]:
project.wait_until(task.is_done)
The default settings will automatically save the content from the
resulting output.json in the DB an you can access the data that was
returned from the task at .output
. In our example the result was
just the size of a the file in bytes
In [47]:
task.output
Out[47]:
170
And you can use this information in an adaptive script to make decisions.
The last thing we did not talk about is the possibility to also call a function with the returned data automatically on successful execution. Since this function is executed on the worker we (so far) only support function calls with the following restrictions.
- you can call a function of the related generator class. for this you
need to create the task using
PythonTask(generator)
- the function name you want to call is stored in
task.then_func_name
. So you can write a generator class with several possible outcomes and chose the function for each task. - The
Generator
needs to be part ofadaptivemd
So in the case of modeller.execute
we create a PythonTask
that
references the following functions
In [48]:
task = modeller.execute(project.trajectories)
In [49]:
task.then_func_name
Out[49]:
'then_func'
So we will call the default then_func
of modeller or the class
modeller is of.
In [50]:
help(modeller.then_func)
Help on function then_func in module adaptivemd.analysis.pyemma.emma:
then_func(project, task, data, inputs)
These callbacks are called with the current project, the resulting data
(which is in the modeller case a Model
object) and array of initial
inputs.
This is the actual code of the callback
@staticmethod
def then_func(project, task, model, inputs):
# add the input arguments for later reference
model.data['input']['trajectories'] = inputs['kwargs']['files']
model.data['input']['pdb'] = inputs['kwargs']['topfile']
project.models.add(model)
All it does is to add some of the input parameters to the model for later reference and then store the model in the project. You are free to define all sorts of actions here, even queue new tasks.
Next, we will talk about the factories for Task
objects, called
generators
. There we will actually write a new class that does some
stuff with the results.
In [51]:
project.close()
Example 5 - Generators¶
Custom Generator
objects¶
This example should guide you to build your own simple generator.
In [ ]:
from adaptivemd import (
Project, Task, File, PythonTask
)
project = Project('tutorial')
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']
Basic knowledge¶
We assume that you have completed at least some of the previous examples and have a general idea of how adaptiveMD works. Still, let’s recapitulate what we think is the typical way of a simulation.
To execute something you need
- a description of the task to be done. This is the
Task
object. Once you have this you can, - use it in a
Scheduler
which will interpret theTask
into some code that the computer understands. It handles all the little things you expect from the task, like registering generated file, etc… And to do so, theScheduler
needs - your
Resource
description which acts like a config for the scheduler
When you have a Scheduler
(with Resource
) you let it execute
Task
objects. If you know how to build these you are done. That is
all you need.
What are Generator
s?¶
Build a task can be cumbersome and often repetative, and a factory for
Task
objects is extremely useful. These are called Generator
s
(maybe TaskFactory
) is a better name?!?
In your final scheme where you observe all generated objects and want to
build new tasks accordingly you will (almost) never build a Task
yourself. You use a generator.
A typical example is an Engine
. It will generate tasks, that
simulate new trajectories, extend existing ones, etc… Basic stuff. The
second big class is Analysis
. It will use trajectories to generate
models or properties of interest to guide your decisions for new
trajectories.
In this example we will build a simple generator for a task, that uses
the mdtraj
package to compute some features and store these in the
database and in a file.
The MDTrajFeaturizer
generator¶
First, we think about how this featurizer works if we would not use
adaptivemd
. The reason is, that we have basically two choices for
designing a Task
(see example 4 about Task
objects).
- A task that calls bash commands for you
- A task that calls a python function for you
Since we want to call mdtraj
functions we use the 2nd and start with
a skeleton for this type and store it under my_generator.py
In [ ]:
%%file my_generator.py
# This is an example for building your own generator
# This file must be added to the project so that it is loaded
# when you import `adaptivemd`. Otherwise your workers don't know
# about the class!
from adaptivemd import Generator
class MDTrajFeaturizer(Generator):
def __init__(self, {things we always need}):
super(PyEMMAAnalysis, self).__init__()
# stage file you want to reuse (optional)
# self['pdb_file'] = pdb_file
# stage = pdb_file.transfer('staging:///')
# self['pdb_file_stage'] = stage.target
# self.initial_staging.append(stage)
@staticmethod
def then_func(project, task, data, inputs):
# add the output for later reference
project.data.add(data)
def execute(self, {options per task}):
t = PythonTask(self)
# get your staged files (optional)
# input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
# add the python function call to your script (there can be only one!)
t.call(
my_script,
param1,
param2,
...
)
return t
def my_script(param1, param2, ...):
return {"whatever you want to return"}
Mdtraj needs a topology unless it is already present. Interestingly, our
Trajectory
objects know about their topology so we could access
these, if our function is to process a Trajectory
. This requires the
Trajectory
to be the input. If we want to process any file, then we
might need a topology.
The decision if we want the generator to work for a fixed topology is
yours. To show how this would work, we do this here. We use a fixed
topology per generator that applies to File
objects.
Second is the feature we want to compute. This is tricky and so we hard code this now. You can think of a better way to represent this. But let’s pick the tertiary stucture prediction
In [ ]:
def __init__(self, pdb_file=None):
super(PyEMMAAnalysis, self).__init__()
# if we provide a pdb_file it should be used
if pdb_file is not None:
# stage file you want to reuse (optional)
# give the file an internal name
self['pdb_file'] = pdb_file
# create the transfer from local to staging:
stage = pdb_file.transfer('staging:///')
# give the staged file an internal name
self['pdb_file_stage'] = stage.target
# append the transfer action to the initial staging action list
self.initial_staging.append(stage)
In [ ]:
def execute(self, file_to_analyze):
assert(isinstance(file_to_analyze, File))
t = PythonTask(self)
# get your staged files (optional)
if self.get('pdb_file_stage'):
input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
else:
input_pdb = None
# add the python function call to your script (there can be only one!)
t.call(
my_script,
file_to_analyze,
input_pdb
)
return t
This script is executed on the HPC for you. And requires mdtraj to be installed on it.
In [ ]:
def my_script(file_to_analyze, input_pdb):
import mdtraj as md
traj = md.load(file_to_analyze, top=input_pdb)
features = traj.compute_xyz()
return features
That’s it. At least in the simplest form. When you use this to create a
Task
In [ ]:
my_generator = MDTrajFeaturizer(pdb_file)
task = my_generator.execute(traj.file('master.dcd'))
project.queue(task)
We wait and then the Task
object has a .output
property which
now contains the returned result.
This can now be used in your execution plans…
In [ ]:
def strategy():
# generate some structures...
# yield wait ...
# get a traj object
task = my_generator.execute(traj.outputs('master'))
# wait until the task is done
yield task.is_done
# print the output
output = task.output
# do something with the result, store in the DB, etc...
Next, we look at improvements
Often you want to save the output from your function in the DB in some form or another. Though the output is stored, it is not conviniently accessed unless you know the task that was used.
For this reason there is a callback function you can set, that can take
care of doing a custom handling of the output. The function to be called
needs to be a method of the generator and you can give the task the name
of the method. The name (str) of the funtion can be set using the
then()
command. An the default name is then_func
.
In [ ]:
def execute(self, ...):
t = PythonTask(self)
t.then('handle_my_output')
@staticmethod
def handle_my_output(project, task, data, inputs):
print 'Saving data from task', task, 'into model'
m = Model(data)
project.model.add(m)
The function takes exactly 4 parameters
project
: the project in which the task was run. Is used to access the database, etctask
: the actual task object that produced the outputdata
: the output returned by the functioninputs
: the input to the python function call (internally). The data actually transmitted to the worker to run
Like in the above example you can do whatever you want with your data,
store it, alter it, write it to a file, etc. In case you do not want to
additionally save the output (data
) in the DB as an object, you can
tell the trask not to by setting
In [ ]:
def execute(self, ...):
t = PythonTask(self)
t.then('handle_my_output')
t.store_output = False # default is `True`
in that case .output
will stay None
even after execution
Trajectory
files and get their properties¶Note that you always have to write file generation and file analysis/reading that matches. We only store some very general properties of objects with them, e.g. a stride for trajectories. This means you cannot arbitrarily mix code for these.
Now we want that this works
In [ ]:
my_generator.execute(traj)
This is rather simple: All you need to do is to extract the actual files from the trajectory object.
In [ ]:
def __init__(self, outtype, pdb_file=None):
super(PyEMMAAnalysis, self).__init__()
# we store a str that holds the name of the outputtype
# this must match the definition
self.outtype = outtype
# ...
def execute(self, traj, *args, **kwargs):
t = PythonTask(self)
# ...
file_location = traj.outputs(self.outtype) # get the trajectory file matching outtype
# use the file_location.
# ...
Import! You have no access to the Trajectory
object in our
remove function. These will be converted to a real path relative to the
working directory. This makes sure that you will not have to deal with
prefixes, etc. This might change in the future, but. The scripts are
considered independent of adaptivemd!
This is not complicated but you need to briefly learn about the
mechanism to store complex Python objects in the DB. The general way to
Store an instance of a class requires you to subclass from
adaptivemd.mongodb.StorableMixin
. This provides the class with a
__uuid__
attribute that is a unique number for each storable object
that is given at creation time. (If we would just store objects using
pymongo we would get a number like this, but later). Secondly, it add
two functions
to_dict()
: this converts the (immutable) state of the object into a dictionary that is simple enough that it can be stored. Simple enought means, that you can have Python primitives, things like numpy arrays or even other storable objects, but not arbitrary objects in it, like lambda constructs (these are possible but need special treatment)from_dict()
: The reverse. It takes the dictionary fromto_dict
and must return an equivalent object!
So, you can do
clone = obj.__class__.from_dict(obj.to_dict())
and get an equal object in that it has the same attributes. You could also say a deep copy.
This is not always trivial and there exists a default implementation, which will make an additional assumption:
All necessary attributes have the same parameters in __init__
. So,
this would correspond to this rule
In [ ]:
class MyStorableObject(StorableMixin):
def __init__(self, state):
self.state = state
while this would not work
In [ ]:
class MyStorableObject(StorableMixin):
def __init__(self, initial_state):
self.state = initial_state
In the second case you need to overwrite the default function. All of these will work
In [ ]:
# fix `to_dict` to match default `from_dict`
class MyStorableObject(StorableMixin):
def __init__(self, initial_state):
self.state = initial_state
def to_dict(self):
return {
'initial_state': self.state
}
In [ ]:
# fix `from_dict` to match default `to_dict`
class MyStorableObject(StorableMixin):
def __init__(self, initial_state):
self.state = initial_state
@classmethod
def from_dict(cls, dct):
return cls(initial_state=dct['state'])
In [ ]:
# fix both `from_dict` and `to_dict`
class MyStorableObject(StorableMixin):
def __init__(self, initial_state):
self.state = initial_state
def to_dict(self):
return {
'my_state': self.state
}
@classmethod
def from_dict(cls, dct):
return cls(initial_state=dct['my_state'])
If you do that, make sure that you really capture all variables. Especially if you subclass from an existing one. You can use super to access the result from the parent class
In [ ]:
class MyStorableObject(StorableMixin):
@classmethod
def from_dict(cls, dct):
obj = super(MyStorableObject, cls).from_dict(dct)
obj.missing_attr1 = dct['missing_attr_key1']
return obj
def to_dict(self):
dct = super(MyStorableObject, self).to_dict(self)
dct.update({
'missing_attr_key1': self.missing_attr1
})
return dct
This is the recommended way to build your custom functions. For
completeness we show here what the base TaskGenerator
class will do
In [ ]:
@classmethod
def from_dict(cls, dct):
obj = cls.__new__(cls)
StorableMixin.__init__(obj)
obj._items = dct['_items']
obj.initial_staging = dct['initial_staging']
return obj
def to_dict(self):
return {
'_items': self._items,
'initial_staging': self.initial_staging
}
The only unfamiliar part is the
obj = cls.__new__(cls)
StorableMixin.__init__(obj)
which needs a little explanation.
In most __init__
functions for a TaskGenerator
you will
construct the initial_staging
attribute with some functions. If you
would reconstruct by just calling the constructor with the same
parameters again, this would result in an equal object as expected and
that would work, but not in all regards as expected: The problem is that
if you generate objects that can be stored, these will get new UUIDs and
hence are considered different from the ones that you wanted to store.
In short, the construction in the __init__
prevents you from getting
the real old object back, you always construct something new.
This can be solved by not using __init__
but creating an empty
object using __new__
and then fixing all attributes to the original
state. This is very similar to __setstate__
which we do not use in
general to still allow using __init__
which makes sense in most
cases where not storable objects are generated.
In the following we discuss an existing generator
A simple generator¶
A word about this example. While a Task
can be created and
configured a new class in adaptivemd
needs to be part of the
project. So we will write discuss the essential parts of the existing
code.
A generator is in essence a factory to create Task
objects with a
single command. A generator can be initialized with certain files that
the created tasks will always need, like an engine will need a topology
for each task, etc. It also (as explained briefly before in Example 4)
knows about certain callback behaviour of their tasks. Last, a generator
allows you to assign a worker only to tasks that were created by a
generator.
Let’s look at the code of the PyEMMAAnalysis
class PyEMMAAnalysis(Analysis):
def __init__(self, pdb_file):
super(PyEMMAAnalysis, self).__init__()
self['pdb_file'] = pdb_file
stage = pdb_file.transfer('staging:///')
self['pdb_file_stage'] = stage.target
self.initial_staging.append(stage)
@staticmethod
def then_func(project, task, model, inputs):
# add the input arguments for later reference
model.data['input']['trajectories'] = inputs['files']
model.data['input']['pdb'] = inputs['topfile']
project.models.add(model)
def execute(
self,
trajectories,
tica_lag=2,
tica_dim=2,
msm_states=5,
msm_lag=2,
stride=1):
t = PythonTask(self)
input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
t.call(
remote_analysis,
trajectories=list(trajectories),
topfile=input_pdb,
tica_lag=tica_lag,
tica_dim=tica_dim,
msm_states=msm_states,
msm_lag=msm_lag,
stride=stride
)
return t
def __init__(self, pdb_file):
# don't forget to call super
super(PyEMMAAnalysis, self).__init__()
# a generator also acts like a dictionary for files
# this way you can later access certain files you might need
# save the pdb_file under the same name
self['pdb_file'] = pdb_file
# this creates a transfer action like it is used in tasks
# and moves the passed pdb_file (usually on the local machein)
# to the staging_area root directory
stage = pdb_file.transfer('staging:///')
# and the new target file (which is also like the original)
# on the staging_area is saved unter `pdb_file_stage`
# so, we can access both files if we wanted to
# note that the original file most likely is in the DB
# so we could just skip the stage transfer completely
self['pdb_file_stage'] = stage.target
# last we add this transfer to the initial_staging which
# is done only once per used generator
self.initial_staging.append(stage)
# the kwargs is to keep the exmaple short, you should use explicit
# parameters and add appropriate docs
def execute(self, trajectories, **kwargs):
# create the task and set the generator to self, our new generator
t = PythonTask(self)
# we want to copy the staged file to the worker directory
# and name it `input.pdb`
input_pdb = t.link(self['pdb_file_stage'], 'input.pdb')
# if you chose not to use the staging file and copy it directly you
# would use in analogy
# input_pdb = t.link(self['pdb_file'], 'input.pdb')
# finally we use `.call` and want to call the `remote_analysis` function
# which we imported earlier from somewhere
t.call(
remote_analysis,
trajectories=list(trajectories),
**kwargs
)
return t
And finally a call_back function. The name then_func
is the default
function name to be called.
# we use a static method, but you can of course write a normal method
@staticmethod
# the call_backs take these arguments in this order
# the second parameter is actually a `Model` object in this case
# which has a `.data` attribute
def then_func(project, task, model, inputs):
# add the input arguments for later reference to the model
model.data['input']['trajectories'] = inputs['kwargs']['files']
model.data['input']['pdb'] = inputs['kwargs']['topfile']
# and save the model in the project
project.models.add(model)
A brief summary and things you need to set to make your generator work
class MyGenerator(Analysis):
def __init__(self, {things your generator always needs}):
super(MyGenerator, self).__init__()
# Add input files to self
self['file1'] = file1
# stage all files to the staging area of you want to keep these
# files on the HPC
for fn in ['file1', 'file2', ...]:
stage = self[fn].transfer('staging:///')
self[fn + '_stage'] = stage.target
self.initial_staging.append(stage)
@staticmethod
def then_func(project, task, outputs, inputs):
# do something with input and outputs
# store something in your project
def task_using_python_rpc(
self,
{arguments}):
t = PythonTask(self)
# set any task dependencies if you need
t.dependencies = []
input1 = t.link(self['file1'], 'alternative_name1')
input2 = t.link(self['file2'], 'alternative_name2')
...
# add whatever bash stuff you need BEFORE the function call
t.append('some bash command')
...
# use input1, etc in your function call if you like. It will
# be converted to a regular file location you can use
t.call(
{my_remote_python_function},
files=list(files),
)
# add whatever bash stuff you need AFTER the function call
t.append('some bash command')
...
return t
def task_using_bash_argument_call(
self,
{arguments}):
t = Task(self)
# set any task dependencies if you need
t.dependencies = []
input1 = t.link(self['file1'], 'alternative_name1')
input2 = t.link(self['file2'], 'alternative_name2')
...
# add more staging
t.append({action})
...
# add whatever bash stuff you want to do
t.append('some bash command')
...
# add whatever staging stuff you need AFTER the function call
t.append({action})
...
return t
The simplified code for the OpenMMEngine
class OpenMMEngine(Engine):
trajectory_ext = 'dcd'
def __init__(self, system_file, integrator_file, pdb_file, args=None):
super(OpenMMEngine, self).__init__()
self['pdb_file'] = pdb_file
self['system_file'] = system_file
self['integrator_file'] = integrator_file
self['_executable_file'] = exec_file
for fn in self.files:
stage = self[fn].transfer(Location('staging:///'))
self[name + '_stage'] = stage.target
self.initial_staging.append(stage)
if args is None:
args = '-p CPU --store-interval 1'
self.args = args
# this one only works if you start from a file
def task_run_trajectory_from_file(self, target):
# we create a special Task, that has some additional functionality
t = TrajectoryGenerationTask(self, target)
# link all the files we require
initial_pdb = t.link(self['pdb_file_stage'], Location('initial.pdb'))
t.link(self['system_file_stage'])
t.link(self['integrator_file_stage'])
t.link(self['_executable_file_stage'])
# use the initial PDB to be used
input_pdb = t.get(target.frame, 'coordinates.pdb')
# this represents our output trajectory
output = Trajectory('traj/', target.frame, length=target.length, engine=self)
# create the directory so openmmrun can write to it
t.touch(output)
# build the actual bash command
cmd = 'python openmmrun.py {args} -t {pdb} --length {length} {output}'.format(
pdb=input_pdb,
length=target.length,
output=output,
args=self.args,
)
t.append(cmd)
# copy the resulting trajectory directory back to the staging area
t.put(output, target)
return t
In [ ]:
project.close()
Example 6 - Multiple Output Types¶
AdaptiveMD¶
Example 6 - Multi-traj¶
In [1]:
import sys, os
Alright, let’s load the package and pick the Project
since we want
to start a project
In [2]:
from adaptivemd import Project
Let’s open a project with a UNIQUE name. This will be the name used in the DB so make sure it is new and not too short. Opening a project will always create a non-existing project and reopen an exising one. You cannot chose between opening types as you would with a file. This is a precaution to not accidentally delete your project.
In [3]:
# Use this to completely remove the example-worker project from the database.
Project.delete('tutorial-multi')
In [4]:
project = Project('tutorial-multi')
Now we have a handle for our project. First thing is to set it up to work on a resource.
What is a resource? A Resource
specifies a shared filesystem with
one or more clusteres attached to it. This can be your local machine or
just a regular cluster or even a group of cluster that can access the
same FS (like Titan, Eos and Rhea do).
Once you have chosen your place to store your results this way it is set
for the project and can (at least should) not be altered since all file
references are made to match this resource. Currently you can use the Fu
Berlin Allegro Cluster or run locally. There are two specific local
adaptations that include already the path to your conda installation.
This simplifies the use of openmm
or pyemma
.
Let us pick a local resource on a laptop for now.
In [5]:
from adaptivemd import LocalCluster, AllegroCluster
first pick your resource – where you want to run your simulation. Local or on Allegro
In [6]:
resource = LocalCluster()
In [7]:
project.initialize(resource)
TaskGenerators
¶TaskGenerators are instances whose purpose is to create tasks to be
executed. This is similar to the way Kernels work. A TaskGenerator will
generate Task
objects for you which will be translated into a
ComputeUnitDescription
and executed. In simple terms:
The task generator creates the bash scripts for you that run a simulation or run pyemma.
A task generator will be initialized with all parameters needed to make it work and it will now what needs to be staged to be used.
In [8]:
from adaptivemd.engine.openmm import OpenMMEngine
from adaptivemd import File, Directory
In [9]:
pdb_file = File('file://../files/alanine/alanine.pdb').named('initial_pdb').load()
In [10]:
engine = OpenMMEngine(
pdb_file=pdb_file,
system_file=File('file://../files/alanine/system.xml').load(),
integrator_file=File('file://../files/alanine/integrator.xml').load(),
args='-r --report-interval 1 -p CPU'
).named('openmm')
In [11]:
engine.add_output_type('master', 'master.dcd', 10)
engine.add_output_type('protein', 'protein.dcd', 1)
In [12]:
engine.types
Out[12]:
{'master': <adaptivemd.engine.engine.OutputTypeDescription at 0x10f7254d0>,
'protein': <adaptivemd.engine.engine.OutputTypeDescription at 0x10f725510>}
In [13]:
project.generators.add(engine)
In [14]:
s = engine._create_output_str()
print s
--types="{'protein':{'stride':1,'filename':'protein.dcd'},'master':{'stride':10,'filename':'master.dcd'}}"
In [15]:
task = project.new_trajectory(pdb_file, 100, engine=engine).run()
Trajectory
object¶
In [16]:
project.queue(task) # shortcut for project.tasks.add(task)
That is all we can do from here. To execute the tasks you need to run a worker using
adaptivemdworker -l tutorial --verbose
In [17]:
print project.tasks
<StoredBundle for with 2 file(s) @ 0x10f6e3e90>
In [18]:
task.trajectory
Out[18]:
Trajectory('alanine.pdb' >> [0..100])
In [21]:
task.state
Out[21]:
u'success'
In [22]:
t = project.trajectories.one
In [24]:
t.types['protein']
Out[24]:
<adaptivemd.engine.engine.OutputTypeDescription at 0x10f725510>
Once this is done, come back here and check your results. If you want you can execute the next cell which will block until the task has been completed.
In [25]:
print project.files
print project.trajectories
<StoredBundle for with 5 file(s) @ 0x10f6e3e50>
<ViewBundle for with 1 file(s) @ 0x10f6e3e10>
and close the project.
In [25]:
project.close()
The final project.close() will close the DB connection.
Resources¶
A Resource
specifies a shared filesystem with one or more clusteres
attached to it. This can be your local machine or just a regular cluster
or even a group of cluster that can access the same FS (like Titan, Eos
and Rhea do).
Once you have chosen your place to store your results t is set for the project and can (at least should) not be altered since all file references are made to match this resource.
Let us pick a local resource on your laptop or desktop machine; no cluster / HPC involved for now.
from adaptivemd import LocalResource
We now create the Resource object
resource = LocalResource()
Since this object defines the path where all files will be placed, let’s get the path to the shared folder. The one that can be accessed from all workers. On your local machine this is trivially the case.
resource.shared_path
'$HOME/adaptivemd/'
Okay, files will be placed in $HOME/adaptivemd/
. You can change this
using an option when creating the Resource
LocalCluster(shared_path='$HOME/my/adaptive/folder/')
Configuring your resource¶
Now you can add some additional paths, conda environment, etc, before we
setup the project. This works by setting a special task .wrapper
(see notebook 4 for more things you can do with Task
objects.)
resource.wrapper
<adaptivemd.task.DummyTask at 0x110d93d50>
In a nutshell, this dummy task has a .pre
and .post
list of
commands you can add any command you want to be executed before every
task you run.
resource.wrapper.pre.append('echo "Hello World"')
A task can also automatically add to the PATH
variable, set
environment variables and you can add conda environments
resource.wrapper.add_conda_env('my_env_python_27')
resource.wrapper.add_path('/x/y/z')
resource.wrapper.environment['CONDA'] = 'True'
print resource.wrapper.description
Task: DummyTask
<pre>
export PATH=/x/y/z:$PATH
export CONDA=True
echo "Hello World"
</pre>
<main />
<post>
</post>
Let’s reset that now and just add a little comment
resource = LocalResource()
resource.wrapper.pre.append('# This is part of the adaptivemd tutorial')
Finalize the Resource
¶
Last, we save our configured Resource
and initialize our empty
prohect with it. This is done once for a project and should not be
altered.
project.initialize(resource)
Classes¶
LocalResource ([shared_path, wrapper]) |
Run tasks locally and store results in $HOME/adaptivemd/ |
Files¶
The File
object. Instead of just a string, these are
used to represent files anywhere, on the cluster or your local
application. There are some subclasses or extensions of File
that
have additional meta information like Trajectory
or Frame
. The
underlying base object of a File
is called a Location
.
All of these objects share the location
property. A string that
represents a location for a file in general.
f = File('system.pdb')
This representation is so far useless unless we specify where this file is located. It could be on the HPC somewhere or on the local computer. To do that we use prefixes
{drive}://{relative_path}
or{drive}:///{absolute_path}
(for local files)
You can use the following prefixes
file://
points to files on your local machine.worker://
specifies files on the current working directory of the executing node. Usually these are temprary files for a single execution.shared://
specifies the root shared FS directory (e.g.NO_BACKUP/
on Allegro) Use this to import and export files that are already on the cluster.staging://
a special scheduler-specific caching directory. Use this to relate to files that should be reused, but not stored long-time. A typical example is a PDB file. This is required by every simulation but an input file. You want to copy it once to the cluster and use it over and over.sandbox://
this is a specia folder where all temporary worker directories are located. It also contains the session folders for RP.project://
this folder contains all the project data for your current project and is the place where all the data should be stored for long-time storage
Later you might want to transfer a file from a project folder to the current working directory (whereever this will be) and you would specify locations in this way
project://models/my_model.json >> worker://input_model.json
We start with a first PDB file that is located on this machine at a relative path
pdb_file = File('file://../files/alanine/alanine.pdb')
File
like any complex object in adaptivemd can have a .name
attribute that makes them easier to find later. You can either set the
.name
property after creation, or use a little helper method
.named()
to get a one-liner. This function will set .name
and
return itself.
pdb_file.name = 'initial_pdb'
The .load()
at the end is important. It causes the File
object
to load the content of the file and if you save the File
object, the
actual file is stored with it. This way it can simply be rewritten on
the cluster or anywhere else.
pdb_file.load()
'alanine.pdb'
Now you can access the content
print pdb_file.get_file()[:500]
REMARK 1 CREATED WITH MDTraj 1.8.0, 2016-12-22
CRYST1 26.063 26.063 26.063 90.00 90.00 90.00 P 1 1
MODEL 0
ATOM 1 H1 ACE A 1 -1.900 1.555 26.235 1.00 0.00 H
ATOM 2 CH3 ACE A 1 -1.101 2.011 25.651 1.00 0.00 C
ATOM 3 H2 ACE A 1 -0.850 2.954 26.137 1.00 0.00 H
ATOM 4 H3 ACE A 1 -1.365 2.132 24.600 1.00 0.00 H
ATOM 5 C ACE A 1 0.182
There are a few other things that you can access from a file. There is a time when it was initiated (like any storable object).
print 'timestamp', pdb_file.__time__
print 'uuid', hex(pdb_file.__uuid__)
timestamp 1490777436
uuid 0x5eadd73145711e7a9d3000000000042L
Access the drive (prefix)
print pdb_file.drive
file
Get the path on the drive (see we have converted the relative path to an absolute)
print '...' + pdb_file.dirname[35:]
.../adaptivemd/examples/files/alanine
or the basename
print pdb_file.basename
alanine.pdb
Classes¶
Location (location) |
A representation of a path in adaptiveMD |
File (location) |
Represents a file object at a specific location |
Trajectory (location, frame, length[, engine]) |
Represents a trajectory File on the cluster |
Frame (trajectory, index) |
Represents a frame of a trajectory |
JSONFile (location) |
A special file which as assumed JSON readable content |
DataDict (data) |
Delegate to the contained .data object |
Bundles¶
A Bundle
- A set-enhancement to add filtering and store handling capabilities
Bundles can be accessed like a normal set using iteration. You can add objects
using .add(item)
if the bundle is not a view
Examples¶
Some basic functions
bundle = Bundle(['10', '20', 1, 2, 3])
str_view = bundle.c(basestring) # only how strings
print list(str_view) # ['10', '20']
fnc_view = bundle.v(lambda x: int(x) < 3)
print list(fnc_view) # [1, 2]
Some File
specific functions
import adaptivemd as amd
bundle = Bundle([amd.File('0.dcd'), amd.File('a.pdb')])
file_view = bundle.f('*.dcd')
print list(file_view) # [File('0.dcd')]
Logic operations produce view on the resulting bundle
and_bundle = str_view & fnc_view
print list(and_bundle) # []
and_bundle = str_view | fnc_view
print list(and_bundle) # [1, 2, '10', '20']
A StoredBundle
is attached to a mongodb store (a stored object list).
Adding will append the object to the store if not stored yet. All iteration
and views will always be kept synced with the DB store content.
p = amd.Project('test-project')
store = StoredBundle() # new bundle
store.set_store(p.storage.trajectories) # attach to DB
print list(store) # show all trajectories
len_store = store.v(lambda x: len(x) > 10) # all trajs with len > 10
print list(len_store)
Set do not have ordering so some functions do not make sense. As long as
you are working with storable objects (subclassed from adaptivemd.mongodb.StorableMixin
)
you have some time-ordering (accurate to seconds)
print store.first # get the earlist created object
print store.one # get one (any) single object
print store.last # get the last created object
A bundle is mostly meant to work with storable objects (but does not have to)
To simplify access to certain attributes or apply function to all members you
can use the BaseBundle.all()
attribute and get a delegator that will
apply an attribute or method to all objects
print len_store.all.length # print all lengths of all objects in len_store
print store.all.path # print all path of all trajectories
# call `.execute('shutdown') on all workers in the `.workers` bundle
print p.workers.all.execute('shutdown')
Classes¶
Bundle ([iterable]) |
A container of objects |
StoredBundle () |
A stored bundle in a mongodb |
SortedBundle (bundle, key) |
Sorted view of a bundle |
ViewBundle (bundle, view) |
A view on a bundle where object are filtered by a bool function |
BaseBundle |
BaseClass for Bundle functionality a special set of storable objects |
LogicBundle (bundle1, bundle2) |
Implement simple and and or logic for bundles |
AndBundle (bundle1, bundle2) |
And logic |
OrBundle (bundle1, bundle2) |
Or logic |
BundleDelegator (bundle) |
Delegate an attribute call to all elements in a bundle |
FunctionDelegator (bundle, item) |
Delegate a function call to all elements in a bundle |
Actions¶
Actions are descriptions for executions on the HPC - basically a bash command
Classes¶
Action () |
A bash-command-like action to be executed in a Task |
FileAction (source) |
An Action that involves (at least) one file called source |
FileTransaction (source, target) |
An action involving a source and a target file |
Copy (source, target) |
An action that copies a file from source to target |
Move (source, target) |
An action that moves a file from source to target |
Link (source, target) |
An action that links a source file to a target |
Touch (source) |
An action that creates an empty file or folder |
Remove (source) |
An action that removes a file |
MakeDir (source) |
An action that creates a folder |
Task¶
A Task
is in essence a bash script-like description of what should be
executed by the worker. It has details about files to be linked to the
working directory, bash commands to be executed and some meta
information about what should happen in case we succeed or fail.
The execution structure¶
Let’s first explain briefly how a task is executed and what its components are. This was originally build so that it is compatible with radical.pilot and still is. So, if you are familiar with it, all of the following information should sould very familiar.
A task is executed from within a unique directory that only exists for
this particular task. These are located in adaptivemd/workers/
and
look like
worker.0x5dcccd05097611e7829b000000000072L/
the long number is a hex representation of the UUID of the task. Just if you are curious type
print hex(my_task.__uuid__)
Then we change directory to this folder write a running.sh
bash
script and execute it. This script is created from the task definition
and also depends on your resource setting (which basically only contain
the path to the workers directory, etc)
The script is divided into 1 or 3 parts depending on which Task
class you use. The main Task
uses a single list of commands, while
PrePostTask
has the following structure
- Pre-Exec: Things to happen before the main command (optional)
- Main: the main commands are executed
- Post-Exec: Things to happen after the main command (optional)
Okay, lots of theory, now some real code for running a task that generated a trajectory
task = engine.task_run_trajectory(project.new_trajectory(pdb_file, 100))
task.script
[Link('staging:///alanine.pdb' > 'worker://initial.pdb),
Link('staging:///system.xml' > 'worker://system.xml),
Link('staging:///integrator.xml' > 'worker://integrator.xml),
Link('staging:///openmmrun.py' > 'worker://openmmrun.py),
Touch('worker://traj/'),
'python openmmrun.py -r --report-interval 1 -p CPU --store-interval 1 -t worker://initial.pdb --length 100 worker://traj/',
Move('worker://traj/' > 'sandbox:///{}/00000076/)]
We are linking a lot of files to the worker directory and change the
name for the .pdb in the process. Then call the actual python
script
that runs openmm. And finally move the output.dcd
and the restart
file back tp the trajectory folder.
There is a way to list lot’s of things about tasks and we will use it a lot to see our modifications.
print task.description
Task: TrajectoryGenerationTask(OpenMMEngine) [created]
Sources
- staging:///integrator.xml
- staging:///alanine.pdb
- staging:///openmmrun.py
- staging:///system.xml
Targets
- sandbox:///{}/00000076/
Modified
<pretask>
Link('staging:///alanine.pdb' > 'worker://initial.pdb)
Link('staging:///system.xml' > 'worker://system.xml)
Link('staging:///integrator.xml' > 'worker://integrator.xml)
Link('staging:///openmmrun.py' > 'worker://openmmrun.py)
Touch('worker://traj/')
python openmmrun.py -r --report-interval 1 -p CPU --store-interval 1 -t worker://initial.pdb --length 100 worker://traj/
Move('worker://traj/' > 'sandbox:///{}/00000076/)
<posttask>
Modify a task¶
As long as a task is not saved and hence placed in the queue, it can be altered in any way. All of the 3 / 5 phases can be changed separately. You can add things to the staging phases or bash phases or change the command. So, let’s do that now
Add a bash line¶
First, a Task
is very similar to a list of bash commands and you can
simply append (or prepend) a command. A text line will be interpreted as
a bash command.
task.append('echo "This new line is pointless"')
print task.description
Task: TrajectoryGenerationTask(OpenMMEngine) [created]
Sources
- staging:///integrator.xml
- staging:///alanine.pdb
- staging:///openmmrun.py
- staging:///system.xml
Targets
- sandbox:///{}/00000076/
Modified
<pretask>
Link('staging:///alanine.pdb' > 'worker://initial.pdb)
Link('staging:///system.xml' > 'worker://system.xml)
Link('staging:///integrator.xml' > 'worker://integrator.xml)
Link('staging:///openmmrun.py' > 'worker://openmmrun.py)
Touch('worker://traj/')
python openmmrun.py -r --report-interval 1 -p CPU --store-interval 1 -t worker://initial.pdb --length 100 worker://traj/
Move('worker://traj/' > 'sandbox:///{}/00000076/)
echo "This new line is pointless"
<posttask>
As expected this line was added to the end of the script.
Add staging actions¶
To set staging is more difficult. The reason is, that you normally have no idea where files are located and hence writing a copy or move is impossible. This is why the staging commands are not bash lines but objects that hold information about the actual file transaction to be done. There are some task methods that help you move files but also files itself can generate this commands for you.
Let’s move one trajectory (directory) around a little more as an example
traj = project.trajectories.one
transaction = traj.copy()
print transaction
Copy('sandbox:///{}/00000010/' > 'worker://)
This looks like in the script. The default for a copy is to move a file
or folder to the worker directory under the same name, but you can give
it another name/location if you use that as an argument. Note that since
trajectories are a directory you need to give a directory name (which
end in a /
)
transaction = traj.copy('new_traj/')
print transaction
Copy('sandbox:///{}/00000010/' > 'worker://new_traj/)
If you want to move it not to the worker directory you have to specify
the location and you can do so with the prefixes (shared://
,
sandbox://
, staging://
as explained in the previous examples)
transaction = traj.copy('staging:///cached_trajs/')
print transaction
Copy('sandbox:///{}/00000010/' > 'staging:///cached_trajs/)
Besides .copy
you can also .move
or .link
files.
transaction = pdb_file.copy('staging:///delete.pdb')
print transaction
transaction = pdb_file.move('staging:///delete.pdb')
print transaction
transaction = pdb_file.link('staging:///delete.pdb')
print transaction
Copy('file://{}/alanine.pdb' > 'staging:///delete.pdb)
Move('file://{}/alanine.pdb' > 'staging:///delete.pdb)
Link('file://{}/alanine.pdb' > 'staging:///delete.pdb)
Local files¶
Let’s mention these because they require special treatment. We cannot (like RP can) copy files to the HPC, we need to store them in the DB first.
new_pdb = File('file://../files/ntl9/ntl9.pdb').load()
Make sure you use file://
to indicate that you are using a local
file. The above example uses a relative path which will be replaced by
an absolute one, otherwise we ran into trouble once we open the project
at a different directory.
print new_pdb.location
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/files/ntl9/ntl9.pdb
Note that now there are 3 /
in the filename, two from the ://
and one from the root directory of your machine
The load()
at the end really loads the file and when you save this
File
now it will contain the content of the file. You can access
this content as seen in the previous example.
print new_pdb.get_file()[:300]
CRYST1 50.000 50.000 50.000 90.00 90.00 90.00 P 1
ATOM 1 N MET 1 33.720 28.790 34.120 0.00 0.00 N
ATOM 2 H1 MET 1 33.620 29.790 33.900 0.00 0.00 H
ATOM 3 H2 MET 1 33.770 28.750 35.120 0.00 0.00
For local files you normally use .transfer
, but copy
, move
or link
work as well. Still, there is no difference since the file
only exists in the DB now and copying from the DB to a place on the HPC
results in a simple file creation.
Now, we want to add a command to the staging and see what happens.
transaction = new_pdb.transfer()
print transaction
Transfer('file://{}/ntl9.pdb' > 'worker://ntl9.pdb)
task.append(transaction)
print task.description
Task: TrajectoryGenerationTask(OpenMMEngine) [created]
Sources
- staging:///integrator.xml
- staging:///alanine.pdb
- staging:///openmmrun.py
- file://{}/ntl9.pdb [exists]
- staging:///system.xml
Targets
- sandbox:///{}/00000076/
Modified
<pretask>
Link('staging:///alanine.pdb' > 'worker://initial.pdb)
Link('staging:///system.xml' > 'worker://system.xml)
Link('staging:///integrator.xml' > 'worker://integrator.xml)
Link('staging:///openmmrun.py' > 'worker://openmmrun.py)
Touch('worker://traj/')
python openmmrun.py -r --report-interval 1 -p CPU --store-interval 1 -t worker://initial.pdb --length 100 worker://traj/
Move('worker://traj/' > 'sandbox:///{}/00000076/)
echo "This new line is pointless"
Transfer('file://{}/ntl9.pdb' > 'worker://ntl9.pdb)
<posttask>
We now have one more transfer command. But something else has changed.
There is one more files listed as required. So, the task can only run,
if that file exists, but since we loaded it into the DB, it exists (for
us). For example the newly created trajectory 25.dcd
does not exist
yet. Would that be a requirement the task would fail. But let’s check
that it exists.
new_pdb.exists
True
Okay, we have now the PDB file staged and so any real bash commands
could work with a file ntl9.pdb
. Alright, so let’s output its stats.
task.append('stat ntl9.pdb')
Note that usually you place these stage commands at the top or your script.
Now we could run this task, as before and see, if it works. (Make sure you still have a worker running)
project.queue(task)
And check, that the task is running
task.state
u'success'
If we did not screw up the task, it should have succeeded and we can look at the STDOUT.
print task.stdout
13:11:19 [worker:3] stdout from running task GO... Reading PDB Done Initialize Simulation Done. ('# platform used:', 'CPU') ('# temperature:', Quantity(value=300.0, unit=kelvin)) START SIMULATION DONE Written to directory traj/ This new line is pointless 16777220 97338745 -rw-r--r-- 1 jan-hendrikprinz staff 0 1142279 "Mar 21 13:11:18 2017" "Mar 21 13:11:15 2017" "Mar 21 13:11:15 2017" "Mar 21 13:11:15 2017" 4096 2232 0 ntl9.pdb
Well, great, we have the pointless output and the stats of the newly
staged file ntl9.pdb
How does a real script look like¶
Just for fun let’s create the same scheduler that the
adaptivemdworker
uses, but from inside this notebook.
from adaptivemd import WorkerScheduler
sc = WorkerScheduler(project.resource)
If you really wanted to use the worker you need to initialize it and it
will create directories and stage files for the generators, etc. For
that you need to call sc.enter(project)
, but since we only want it
to parse our tasks, we only set the project without invoking
initialization. You should normally not do that.
sc.project = project
Now we can use a function .task_to_script
that will parse a task
into a bash script. So this is really what would be run on your machine
now.
print '\n'.join(sc.task_to_script(task))
set -e # This is part of the adaptivemd tutorial ln -s ../staging_area/alanine.pdb initial.pdb ln -s ../staging_area/system.xml system.xml ln -s ../staging_area/integrator.xml integrator.xml ln -s ../staging_area/openmmrun.py openmmrun.py mkdir -p traj/ python openmmrun.py -r --report-interval 1 -p CPU --store-interval 1 -t initial.pdb --length 100 traj/ mkdir -p ../../projects/tutorial/trajs/00000076/ mv traj/* ../../projects/tutorial/trajs/00000076/ rm -r traj/ echo "This new line is pointless" # write file ntl9.pdb from DB stat ntl9.pdb
Now you see that all file paths have been properly interpreted to work. See that there is a comment about a temporary file from the DB that is then renamed. This is a little trick to be compatible with RPs way of handling files. (TODO: We might change this to just write to the target file. Need to check if that is still consistent)
A note on file locations¶
One problem with bash scripts is that when you create the tasks you have no concept on where the files actually are located. To get around this the created bash script will be scanned for paths, that contain prefixed like we are used to and are interpreted in the context of the worker / scheduler. The worker is the only instance to know all that is necessary so this is the place to fix that problem.
Let’s see that in a little example, where we create an empty file in the staging area.
task = Task()
task.append('touch staging:///my_file.txt')
print '\n'.join(sc.task_to_script(task))
set -e
# This is part of the adaptivemd tutorial
touch ../staging_area/my_file.txt
And voila, the path has changed to a relative path from the working directory of the worker. Note that you see here the line we added in the very beginning of example 1 to our resource!
A Task from scratch¶
If you want to start a new task you can begin with
task = Task()
as we did before.
Just start adding staging and bash commands and you are done. When you create a task you can assign it a generator, then the system will assume that this task was generated by that generator, so don’t do it for you custom tasks, unless you generated them in a generator. Setting this allows you to tell a worker only to run tasks of certain types.
The Python RPC Task¶
The tasks so far a very powerful, but they lack the possibility to call a python function. Since we are using python here, it would be great to really pretend to call a python function from here and not taking the detour of writing a python bash executable with arguments, etc… An example for this is the PyEmma generator which uses this capability.
Let’s do an example of this as well. Assume we have a python function in
a file (you need to have your code in a file so far so that we can copy
the file to the HPC if necessary). Let’s create the .py
file now.
%%file my_rpc_function.py
def my_func(f):
import os
print f
return os.path.getsize(f)
Overwriting my_rpc_function.py
Now create a PythonTask instead
task = PythonTask()
and the call function has changed. Note that also now you can still add
all the bash and stage commands as before. A PythonTask is also a
subclass of PrePostTask
so we have a .pre
and .post
phase
available.
from my_rpc_function import my_func
We call the function my_func
with one argument
task.call(my_func, f=project.trajectories.one)
print task.description
Task: PythonTask(NoneType) [created]
Sources
- staging:///_run_.py
- file://{}/_rpc_input_0x71bdd2d10e2f11e7a0f00000000002eaL.json
- file://{}/my_rpc_function.py [exists]
Targets
- file://{}/_rpc_output_0x71bdd2d10e2f11e7a0f00000000002eaL.json
Modified
<pretask>
Transfer('file://{}/_rpc_input_0x71bdd2d10e2f11e7a0f00000000002eaL.json' > 'worker://input.json)
Link('staging:///_run_.py' > 'worker://_run_.py)
Transfer('file://{}/my_rpc_function.py' > 'worker://my_rpc_function.py)
python _run_.py
Transfer('worker://output.json' > 'file://{}/_rpc_output_0x71bdd2d10e2f11e7a0f00000000002eaL.json)
<posttask>
Well, interesting. What this actually does is to write the input
arguments to the function into a temporary .json
file on the worker,
(in RP on the local machine and then transfers it to remote), rename it
to input.json
and read it in the _run_.py
. This is still a
little clumsy, but needs to be this way to be RP compatible which only
works with files! Look at the actual script.
You see, that we really copy the .py
file that contains the source
code to the worker directory. All that is done automatically. A little
caution on this. You can either write a function in a single file or use
any installed package, but in this case the same package needs to be
installed on the remote machine as well!
Let’s run it and see what happens.
project.queue(task)
And wait until the task is done
project.wait_until(task.is_done)
The default settings will automatically save the content from the
resulting output.json in the DB an you can access the data that was
returned from the task at .output
. In our example the result was
just the size of a the file in bytes
task.output
136
And you can use this information in an adaptive script to make decisions.
success callback¶
The last thing we did not talk about is the possibility to also call a function with the returned data automatically on successful execution. Since this function is executed on the worker we (so far) only support function calls with the following restrictions.
- you can call a function of the related generator class. for this you
need to create the task using
PythonTask(generator)
- the function name you want to call is stored in
task.then_func_name
. So you can write a generator class with several possible outcomes and chose the function for each task. - The
Generator
needs to be part ofadaptivemd
So in the case of modeller.execute
we create a PythonTask
that
references the following functions
task = modeller.execute(project.trajectories)
task.then_func_name
'then_func'
So we will call the default then_func
of modeller or the class
modeller is of.
help(modeller.then_func)
Help on function then_func in module adaptivemd.analysis.pyemma.emma:
then_func(project, task, model, inputs)
These callbacks are called with the current project, the resulting data
(which is in the modeller case a Model
object) and array of initial
inputs.
This is the actual code of the callback
@staticmethod
def then_func(project, task, model, inputs):
# add the input arguments for later reference
model.data['input']['trajectories'] = inputs['kwargs']['files']
model.data['input']['pdb'] = inputs['kwargs']['topfile']
project.models.add(model)
All it does is to add some of the input parameters to the model for later reference and then store the model in the project. You are free to define all sorts of actions here, even queue new tasks.
Task ([generator]) |
A description for a task running on an HPC |
PythonTask ([generator]) |
A special task that does a RPC python calls |
Engines¶
The Trajectory
object¶
Before we talk about adaptivity, let’s have a look at possibilities to generate trajectories.
We assume that you successfully ran a first trajectory using a worker. Next, we talk about lot’s of ways to generate new trajectories.
You will do this in the beginning. Remember we already have a PDB stored from setting up the engine. if you want to start from this configuration do as before
- create the
Trajectory
object you want - make a task
- submit the task to craft the object into existance on the HPC
A trajectory contains all necessary information to make itself. It has
- a (hopefully unique) location: This will we the folder where all the files that belong to the trajectory go.
- an initial frame: the initial configuration to be used to tell the MD simulation package where to start
- a length in frames to run
- the
Engine
: the actual engine I want to use to create the trajectory.
Note, the Engine
is technically not required unless you want to use
.run()
but it makes sense, because the engine contains information
about the topology and, more importantly information about which output
files are generated. This is the essential information you will need for
analysis, e.g. what is the filename of the trajectory file that contains
the protein structure and what is its stride?
Let’s first build a Trajectory
from scratch
file_name = next(project.traj_name) # get a unique new filename
trajectory = Trajectory(
location=file_name, # this creates a new filename
frame=pdb_file, # initial frame is the PDB
length=100, # length is 100 frames
engine=engine # the engine to be used
)
Since this is tedious to write there is a shortcut
trajectory = project.new_trajectory(
frame=pdb_file,
length=100,
engine=engine,
number=1 # if more then one you get a list of trajectories
)
Like in the first example, now that we have the parameters of the
Trajectory
we can create the task to do that.
OpenMMEngine¶
Let’s do an example for an OpenMM engine. This is simply a small python script that makes OpenMM look like a executable. It run a simulation by providing an initial frame, OpenMM specific system.xml and integrator.xml files and some additional parameters like the platform name, how often to store simulation frames, etc.
engine = OpenMMEngine(
pdb_file=pdb_file,
system_file=File('file://../files/alanine/system.xml').load(),
integrator_file=File('file://../files/alanine/integrator.xml').load(),
args='-r --report-interval 1 -p CPU'
).named('openmm')
We have now an OpenMMEngine which uses the previously made pdb File
object and uses the location defined in there. The same for the OpenMM
XML files and some args to run using the CPU
kernel, etc.
Last we name the engine openmm
to find it later.
engine.name
Next, we need to set the output types we want the engine to generate. We
chose a stride of 10 for the master
trajectory without selection and
a second trajectory with only protein atoms and native stride.
Note that the stride and all frame number ALWAYS refer to the native
steps used in the engine. In out example the engine uses 2fs
time
steps. So master stores every 20fs
and protein every 2fs
engine.add_output_type('master', 'master.dcd', stride=10)
engine.add_output_type('protein', 'protein.dcd', stride=1, selection='protein')
Classes¶
Engine () |
An generator for trajectory simulation tasks |
Trajectory (location, frame, length[, engine]) |
Represents a trajectory File on the cluster |
OpenMMEngine (system_file, integrator_file, …) |
OpenMM Engine to be used by Adaptive MD |
Generators¶
TaskGenerators are instances whose purpose is to create tasks to be
executed. This is similar to the way Kernels work. A TaskGenerator will
generate Task
objects for you which will be translated into a
radical.pilot.ComputeUnitDescription
and executed. In simple terms:
The task generator creates the bash scripts for you that run a task.
A task generator will be initialized with all parameters needed to make it work and it will now what needs to be staged to be used.
Add generators to project¶
To add a generator to the project for later usage. You pick the
Project.generators()
store and just Bundle.add()
it.
Consider a store to work like a set()
in python. It contains
objects only once and is not ordered.
Therefore we need a name to find the objects later. Of course you can
always iterate over all objects, but the order is not given.
To be precise there is an order in the time of creation of the object, but it is only accurate to seconds and it really is the time it was created and not stored.
project.generators.add(engine)
project.generators.add(modeller)
Note, that you cannot add the same engine twice. But if you create a new engine it will be considered different and hence you can store it again.
Classes¶
TaskGenerator () |
A generator helper for Task object creation |
Engine () |
An generator for trajectory simulation tasks |
Analysis () |
A generator for tasks that represent analysis of trajectories |
Scheduler Functions¶
WorkerScheduler (resource[, verbose]) |
A single instance worker scheduler to interprete Task objects |
Scheduler (resource[, queue, runtime, cores]) |
Class to handle task execution on a resource |
Workers¶
adaptive.Worker`s are the main execution units of your :class:`adaptive.Task
instances.
While the adaptive.Task
object contains specifics about what you want
to happen, like create a trajectory with this length, it does not know anything
about where to run it and how to achieve the goal there. The adaptive.Task
definition is concrete but it misses knowlegde that only the actual
adaptive.Worker
that executes it has. Things like the actual working
directory, (you do not want to interfere with other workers), how to copy
a file from A to B, etc…
There are two ways to use a adaptive.Worker
,
- a manual way in a script, or
- through a stand-alone bash command. That will run a python script which creates a Worker with some options and just runs it until it is shut down.
You will be mostly using the 2. way since it is much simpler and you will typically submit it to the queue and then it will listen in the DB for task to be run in regular intervals.
How does it work¶
Technically a worker gets a task to execute (the task of picking a task from the DB is not solved by the worker!). Then
- A new worker directory is created named according to the task
- It will convert the given task into a bash script (this might involve already copying files from the DB to some folders since this is something that is not handled in a bash script)
- The bash script is executed within the current working directory
- Once it is finished and succeeded the outputs are stored and created files are registered as being existent now.
- A Callback is run, if the task had one
Communication¶
The actual worker will run somewhere on the HPC or as a separate process on your local machine. In both cases the Worker instance will not be present in your execution script or notebook. Hence changes or function you call in your notebook will have no effect to the worker running somewhere else.
Still, any worker that you create through the adaptivemdworker
script will
be stored in the project, so its settings are visible to anyone with access
you your project DB.
Using the BD, you have a way to connect to the worker. You can set a specaicl property which is checked by the running worker in regular intervals and if it takes special values the Worker will act. You could try
The other typical thing that is of interest is the status of the worker
Dead workers¶
This is bad and should not happen, but it can. When a worker dies it does not mean that its execution thread died. The bash script will be run in another thread that is monitored (and should also die if the worker is killed).
Now the worker stalls and stops accepting tasks, etc. What happens?
The worker will continuously send a heartbeat to the DB, which is just a current timestamp. It does this every 10 seconds. You can simply check this by
with the .seen
property.
If it is supposed to write it every 10 seconds and it does not do that for a
minute we get suspicious. When calling project.trigger()
which will also
look for open events to be run, the project also checks, if all workers are
still alive – where alive means that there last alive time is > 60s.
So, if a worker is considered dead, it is sends the kill
command just to make
sure that it will be dead when we will consider it being so and not secretly
keep on working. There would be no problem, if it would sill run correctly but
if it really had failed we want to retry the failed job.
Next, the current task is considered failed and will be restarted. This means
just to set the task.state
to created
. And another worker that is
responding can pick it up. This task will overwrite all files that the failed
task would have generated and so we keep consistent in the database.
RUN adaptivemdworker
¶
the tool adaptivemdworker
takes some options
- usage: adaptivemdworker [-h] [-t [WALLTIME]] [-d [MONGO_DB_PATH]]
- [-g [GENERATORS]] [-w [WRAPPERS]] [-l] [-v] [-a] [–sheep] [-s [SLEEP]] [–heartbeat [HEARTBEAT]] project_name
Run an AdaptiveMD worker
- positional arguments:
- project_name project name the worker should attach to
- optional arguments:
-h, --help show this help message and exit
- -t [WALLTIME], –walltime [WALLTIME]
- minutes until the worker shuts down. If 0 (default) it will run indefinitely
- -d [MONGO_DB_PATH], –mongodb [MONGO_DB_PATH]
- the mongodb url to the db server
- -g [GENERATORS], –generators [GENERATORS]
- a comma separated list of generator names used to dispatch the tasks. the worker will only respond to tasks from generators whose names match one of the names in the given list. Example: –generators=openmm will only run scripts from generators named openmm
- -w [WRAPPERS], –wrappers [WRAPPERS]
- a comma separated list of simple function call to the resource. This can be used to add e.g. CUDA support for specific workers. Example: –wrappers=add_path(“something”),add_cuda_module()
-l, --local if true then the DB is set to the default local port -v, --verbose if true then stdout and stderr of subprocesses will be rerouted. Use for debugging. -a, --allegro if true then the DB is set to the default allegro setting --sheep if true then the DB is set to the default sheep setting
- -s [SLEEP], –sleep [SLEEP]
- polling interval for new jobs in seconds. Default is 2 seconds. Increase to get less traffic on the DB
- –heartbeat [HEARTBEAT]
- heartbeat interval in seconds. Default is 10 seconds.
The folder structure¶
For reference, this is the file structure of adaptiveMD.
{shared_folder}/ # referenced by `shared://` and set in the `Resource`
adaptivemd/ # set in the `Resource`
projects/
{project-name-1}/ # referenced by `project://`
trajs/
00000000/
00000001/
...
models/
workers/ # referenced by `sandbox://`
staging_area/ # referenced by `staging://`
worker.{task_UUID}/ # referenced by `worker://` (only the current one)
...
{shared_folder}
: is specific to your HPC or locally is usually chosen to be$HOME
. The 2.adaptivemd
: is the main folder where we will place all files. You can access the shared folder, there are no restrictions, but this should be restricted to loading input files like previous existing projects, etc. A stored files are place within this directory.projects
: will contain a single folder perProject
, make sure that your project names are short but descriptive to later find files. All files you want to keep for later should be placed here.workers
: this folder is specific to the worker scheduler (there is also the possibility to use radical.pilot which usesradical.pilot.sandbox
). It contains all temporary folders used by the workers to execute your tasks. Each task get a unique folder that also contains the UUID of the task to be handle. It is set up with all files and then in it your task is executed.staging_area
: This is also a temporary folder that contains files that are used by the workers for multiple tasks. Normally a task generating factory knows which files it will need multiple timestrajs
: is a folder used by engines to place trajectories in.
Why do we need a trajectory object?¶
You might wonder why a Trajectory
object is necessary. You could
just build a function that will take these parameters and run a
simulation. At the end it will return the trajectory object. The same
object we created just now.
The main reason is to familiarize you with the general concept of asyncronous execution and so-called Promises. The trajectory object we built is similar to a Promise so what is that exactly?
A Promise is a value (or an object) that represents the result of a
function at some point in the future. In our case it represents a
trajectory at some point in the future. Normal promises have specific
functions do deal with the unknown result, for us this is a little
different but the general concept stands. We create an object that
represents the specifications of a Trajectory
and so, regardless of
the existence, we can use the trajectory as if it would exists:
Get the length
print trajectory.length
100
and since the length is fixed, we know how many frames there are and can access them
print trajectory[20]
Frame(sandbox:///{}/00000001/[20])
ask for a way to extend the trajectory
print trajectory.extend(100)
<adaptivemd.engine.engine.TrajectoryExtensionTask object at 0x110e6e210>
ask for a way to run the trajectory
print trajectory.run()
<adaptivemd.engine.engine.TrajectoryGenerationTask object at 0x110dd46d0>
We can ask to extend it, we can save it. We can reference specific frames in it before running a simulation. You could even build a whole set of related simulations this way without running a single frame. You might understand that this is pretty powerful especially in the context of running asynchronous simulations.
Last, we did not answer why we have two separate steps: Create the trajectory first and then a task from it. The main reason is educational: > It needs to be clear that a ``Trajectory`` *can exist* before running some engine or creating a task for it. The ``Trajectory`` *is not* a result of a simulation action.
Execution Plans¶
You are free to conduct your simulations from a notebook but normally you will use a script. The main point about adaptivity is to make decision about tasks along the way.
We want to first look into a way to run python code asynchroneously in the project. For this, we write a function that should be executed. Inside you will create tasks and submit them.
If the function should pause, use yield
as if you would return
and exit the function. Yield
will allow you to continue at this
yield {condition_to_continue}
This will interrupt your script until the function you return will
return True
when called. An example
def strategy(loops=10, trajs_per_loop=4, length=100):
for loop in range(loops):
# submit some trajectory tasks
trajectories = project.new_ml_trajectory(length, trajs_per_loop)
tasks = map(engine.task_run_trajectory, trajectories)
project.queue(tasks)
# continue if ALL of the tasks are done (can be failed)
yield [task.is_done for task in tasks]
# submit a model job
task = modeller.execute(list(project.trajectories))
project.queue(task)
# when it is done do next loop
yield task.is_done
and add the event to the project (these cannot be stored yet!)
project.add_event(strategy(loops=2))
<adaptivemd.event.FunctionalEvent at 0x10d615050>
What is missing now? The adding of the event triggered the first part of the code. But to recheck if we should continue needs to be done manually.
RP has threads in the background and these can call the trigger whenever something changed or finished.
Still that is no problem, we can do that easily and watch what is happening
Let’s see how our project is growing. TODO: Add threading.Timer to auto trigger.
import time
from IPython.display import clear_output
try:
while project._events:
clear_output(wait=True)
print '# of files %8d : %s' % (len(project.trajectories), '#' * len(project.trajectories))
print '# of models %8d : %s' % (len(project.models), '#' * len(project.models))
sys.stdout.flush()
time.sleep(2)
project.trigger()
except KeyboardInterrupt:
pass
# of files 74 : ##########################################################################
# of models 33 : #################################
Let’s do another round with more loops
project.add_event(strategy(loops=2))
<adaptivemd.event.FunctionalEvent at 0x10d633850>
And some analysis (might have better functions for that)
# find, which frames from which trajectories have been chosen
trajs = project.trajectories
q = {}
ins = {}
for f in trajs:
source = f.frame if isinstance(f.frame, File) else f.frame.trajectory
ind = 0 if isinstance(f.frame, File) else f.frame.index
ins[source] = ins.get(source, []) + [ind]
for a,b in ins.iteritems():
print a.short, ':', b
file://{}/alanine.pdb : [0, 0, 0]
sandbox:///{}/00000005/ : [95, 92, 67, 92]
sandbox:///{}/00000007/ : [11]
sandbox:///{}/00000011/ : [55]
sandbox:///{}/00000000/ : [28, 89, 72]
sandbox:///{}/00000002/ : [106]
sandbox:///{}/00000004/ : [31, 25, 60]
Event¶
And do this with multiple events in parallel.
def strategy2():
for loop in range(10):
num = len(project.trajectories)
task = modeller.execute(list(project.trajectories))
project.queue(task)
yield task.is_done
# continue only when there are at least 2 more trajectories
yield project.on_ntraj(num + 2)
project.add_event(strategy(loops=10, trajs_per_loop=2))
project.add_event(strategy2())
<adaptivemd.event.FunctionalEvent at 0x107744c90>
And now wait until all events are finished.
project.wait_until(project.events_done)
Classes¶
ExecutionPlan (generator) |
An wrap to turn python function into asynchronous execution |