Example 2 - The Tasks¶
Imports¶
In [1]:
import sys, os
In [2]:
from adaptivemd import Project, Event, FunctionalEvent, Trajectory
Let’s open our test
project by its name. If you completed the
previous example this should all work out of the box.
In [3]:
project = Project('tutorial')
Open all connections to the MongoDB
and Session
so we can get
started.
Let’s see where we are. These numbers will depend on whether you run this notebook for the first time or just continue again. Unless you delete your project it will accumulate models and files over time, as is our ultimate goal.
In [4]:
print project.tasks
print project.trajectories
print project.models
<StoredBundle for with 1 file(s) @ 0x10949f5d0>
<ViewBundle for with 0 file(s) @ 0x10949f710>
<StoredBundle for with 0 file(s) @ 0x10949f510>
Now restore our old ways to generate tasks by loading the previously used generators.
In [5]:
engine = project.generators['openmm']
modeller = project.generators['pyemma']
pdb_file = project.files['initial_pdb']
Remember that we stored some files in the database and of course you can look at them again, should that be important.
In [7]:
print pdb_file.get_file()[:1000] + ' [...]'
REMARK 1 CREATED WITH MDTraj 1.8.0, 2016-12-22
CRYST1 26.063 26.063 26.063 90.00 90.00 90.00 P 1 1
MODEL 0
ATOM 1 H1 ACE A 1 -1.900 1.555 26.235 1.00 0.00 H
ATOM 2 CH3 ACE A 1 -1.101 2.011 25.651 1.00 0.00 C
ATOM 3 H2 ACE A 1 -0.850 2.954 26.137 1.00 0.00 H
ATOM 4 H3 ACE A 1 -1.365 2.132 24.600 1.00 0.00 H
ATOM 5 C ACE A 1 0.182 1.186 25.767 1.00 0.00 C
ATOM 6 O ACE A 1 1.089 1.407 26.645 1.00 0.00 O
ATOM 7 N ALA A 2 0.302 0.256 24.807 1.00 0.00 N
ATOM 8 H ALA A 2 -0.588 0.102 24.354 1.00 0.00 H
ATOM 9 CA ALA A 2 1.498 -0.651 24.567 1.00 0.00 C
ATOM 10 HA ALA A 2 1.810 -0.944 25.570 1.00 0.00 H
ATOM 11 CB ALA A 2 1.054 -1.959 23.852 [...]
The Trajectory
object¶
Before we talk about adaptivity, let’s have a look at possibilities to generate trajectories.
We assume that you successfully ran a first trajectory using a worker. Next, we talk about lot’s of ways to generate new trajectories.
Trajectories from a pdb¶
You will do this in the beginning. Remember we already have a PDB stored from setting up the engine. if you want to start from this configuration do as before
- create the
Trajectory
object you want - make a task
- submit the task to craft the object into existance on the HPC
A trajectory contains all necessary information to make itself. It has
- a (hopefully unique) location: This will we the folder where all the files that belong to the trajectory go.
- an initial frame: the initial configuration to be used to tell the MD simulation package where to start
- a length in frames to run
- the
Engine
: the actual engine I want to use to create the trajectory.
Note, the Engine
is technically not required unless you want to use
.run()
but it makes sense, because the engine contains information
about the topology and, more importantly information about which output
files are generated. This is the essential information you will need for
analysis, e.g. what is the filename of the trajectory file that contains
the protein structure and what is its stride?
Let’s first build a Trajectory
from scratch
In [8]:
file_name = next(project.traj_name) # get a unique new filename
trajectory = Trajectory(
location=file_name, # this creates a new filename
frame=pdb_file, # initial frame is the PDB
length=100, # length is 100 frames
engine=engine # the engine to be used
)
Since this is tedious to write there is a shortcut
In [9]:
trajectory = project.new_trajectory(
frame=pdb_file,
length=100,
engine=engine,
number=1 # if more then one you get a list of trajectories
)
Like in the first example, now that we have the parameters of the
Trajectory
we can create the task to do that.
The Task
object¶
First, an example
In [13]:
task_run = engine.run(trajectory)
This was easy, but we can do some interesting stuff. Since we know the trajectory will exist now we can also extend by some frames. Remember, the trajectory does not really exist yet (not until we submit it and a worker executes it), but we can pretend that it does, since it’s relevant propertier are set.
In [14]:
task_extend = engine.extend(trajectory, 50)
The only problem is to make sure the tasks are run in the correct order. This would not be a problem if the worker will run tasks in the order they are place in the queue, but that defeats the purpose of parallel runs. Therefore an extended tasks knows that is depends on the existance of the source trajectory. The worker will hence only run a trajectory, once the source exists.
A queueing system ?¶
We might wonder at this point how we manage to construct the dependency graph between all tasks and how this is handled and optimized, etc...
Well, we don’t. There is no dependency graph, at least not explicitely. All we do, is to check at a time among all task that should be run, which of there can be run. And this is easy to check, all dependent tasks need to be completed and must have succeeded. Then we can rely on their (the dependencies) results to exists and it makes sense to continue.
A real dependency graph would go even further and know about all future relations and you could identify bottleneck tasks which are necessary to allow other tasks to be run. We don’t do that (yet). It could improve performance in the sense that you will run at optimal load balance and keep all workers as busy as possible. Consider our a attempt a first order dependency graph.
In [15]:
project.queue(task_run, task_extend)
A not on simulation length¶
Remember that we allow an engine to output multiple trajectory types with freely chosen strides? This could leave to trouble. Imagine this (unrealistic) situation:
We have 1. full trajectory with stride=10
2. a reduced protein-only
trajectory with stride=7
Now run a trajectory of length=300
. We get
- 30+1 full (+1 for the initial frame) and
- 42+1 protein frames
That per se is no problem, but if you want to extend we only have a restart file for the very last frame and while this works for the full trajectory, for the protein trajectory you are 6 frames short. Just continuing and concatenating the two leads to a gap of 6+7=13 frames instead of 7. A small big potentially significant source of error.
So, compute the least common multiple of all strides using
In [16]:
engine.native_stride
Out[16]:
10
simpler function calls¶
There is also a shorter way of writing this
In [17]:
# task = trajectory.run().extend(50)
This will create two tasks that first runs the trajectory and then extend it by 50 frames (in native engine frames)
If you want to do that several times, you can pass a list of ints which
is a shortcut for calling .extend(l1).extend(l2). ...
In [18]:
# task = trajectory.run().extend([10] * 10)
This will create 10! tasks that eacht will extend the previous one. Each of the task requires the previous one to finish, this way the dependency is preserved. You can use this to mimick using several restarts in between and it also means that you have no idea which worker will actually start and which worker will continue or finish a trajectory.
Checking the results¶
For a seconds let’s see if everything went fine.
In [60]:
for t in project.trajectories:
print t.short, t.length
sandbox:///{}/00000000/ 150
sandbox:///{}/00000003/ 100
sandbox:///{}/00000005/ 100
sandbox:///{}/00000006/ 100
sandbox:///{}/00000007/ 100
sandbox:///{}/00000008/ 100
sandbox:///{}/00000009/ 100
If this works, then you should see one 100 frame trajectory from the setup (first example) and a second 150 length trajectory that we just generated by running 100 frames and extending it by another 50.
If not, there might be a problem or (more likely) the tasks are not finished yet. Just try the above cell again and see if it changes to the expected output.
project.trajectories
will show you only existing trajectories. Not
ones, that are planned or have been extended. If you want to see all the
ones already in the project, you can look at project.files
. Which is
a bundle and bundles can be filtered. But first all files
In [63]:
for f in project.files:
print f
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/adaptivemd/scripts/_run_.py
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/adaptivemd/engine/openmm/openmmrun.py
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/files/alanine/alanine.pdb
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/files/alanine/system.xml
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/files/alanine/integrator.xml
sandbox:///projects/tutorial/trajs/00000000/
sandbox:///projects/tutorial/trajs/00000000/
sandbox:///projects/tutorial/trajs/00000001/
sandbox:///projects/tutorial/trajs/00000002/
sandbox:///projects/tutorial/trajs/00000003/
sandbox:///projects/tutorial/trajs/00000005/
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/tutorial/_rpc_input_0x43a0c8dc148311e7acff0000000001a0L.json
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/adaptivemd/scripts/_run_.py
file:///Users/jan-hendrikprinz/Studium/git/adaptivemd/examples/tutorial/_rpc_output_0x43a0c8dc148311e7acff0000000001a0L.json
project:///models/model.0x43a0c8dc148311e7acff0000000001a0L.json
sandbox:///projects/tutorial/trajs/00000006/
sandbox:///projects/tutorial/trajs/00000007/
sandbox:///projects/tutorial/trajs/00000008/
sandbox:///projects/tutorial/trajs/00000009/
Now all files filtered by [c]lass Trajectory
. DT
is a little
helper to convert time stamps into something readable.
In [66]:
from adaptivemd import DT
In [75]:
for t in project.files.c(Trajectory):
print t.short, t.length,
if t.created:
if t.created > 0:
print 'created @ %s' % DT(t.created)
else:
print 'modified @ %s' % DT(-t.created)
else:
print 'not existent'
sandbox:///{}/00000000/ 100 modified @ 2017-03-29 15:57:38
sandbox:///{}/00000000/ 150 created @ 2017-03-29 15:57:38
sandbox:///{}/00000001/ 100 not existent
sandbox:///{}/00000002/ 100 not existent
sandbox:///{}/00000003/ 100 created @ 2017-03-29 15:58:55
sandbox:///{}/00000005/ 100 created @ 2017-03-29 15:59:59
sandbox:///{}/00000006/ 100 created @ 2017-03-29 16:01:27
sandbox:///{}/00000007/ 100 created @ 2017-03-29 16:01:33
sandbox:///{}/00000008/ 100 created @ 2017-03-29 16:01:39
sandbox:///{}/00000009/ 100 created @ 2017-03-29 16:01:45
You see, that the extended trajecory appears twice once with length 100
and once with length 150. This is correct, because at the idea of a 100
frame trajectory was used and hence is saved. But why does this one not
appear in the list of trajectories. It was created first and had a
timestamp of creation written to .created
. This is the time when the
worker finishes and was successful.
At the same time, all files that are overwritten, are marked as modified by setting a negative timestamp. So if
.created is None
, the file does not exist nor has it..created > 0
, the file exists.created < 0
, the file existed but has been overwritten
Finally, all project.trajectories
are files of type Trajectory
with positive created
index.
Dealing with errors¶
Let’s do something stupid and produce an error by using a wrong initial pdb file.
In [26]:
trajectory = project.new_trajectory(engine['system_file'], 100)
task = engine.run(trajectory)
project.queue(task)
Well, nothing changed obviously and we expect it to fail. So let’s inspect what happened.
In [32]:
task.state
Out[32]:
u'fail'
You might need to execute this cell several times. It will first become
queued
, then running
and finally fail
and stop there.
It failed, well, we kind of knew that. No suprise here, but why? Let’s look at the stdout and stderr
In [33]:
print task.stdout
15:58:14 [worker:3] stdout from running task
GO...
Reading PDB
In [34]:
print task.stderr
15:58:14 [worker:3] stderr from running task
Traceback (most recent call last):
File "openmmrun.py", line 169, in <module>
pdb = PDBFile(args.topology_pdb)
File "/Users/jan-hendrikprinz/anaconda/lib/python2.7/site-packages/simtk/openmm/app/pdbfile.py", line 159, in __init__
self.positions = self._positions[0]
IndexError: list index out of range
We see, what we expect. In openmmrun.py
the openmm executable it
could not load the pdb file.
NOTE If your worker dies for some reason, it will not set a STDOUT or STDERR. If you think that your task should be able to execute, then you can dotask.state = 'created'
and reset it to be accessible to workers. This is NOT recommended, just to explain how this works. Of course you need a new worker anyway.
What else¶
If you have a Trajectory
object and create the real trajectory file,
you can also put the Trajectory
directly into the queue. This is
equivalent to call .run
on the trajectory and submit the resulting
Task
to the queue. The only downside is that you do not see the task
object and cannot directly work with it, check it’s status, etc...
In [76]:
# project.queue(project.new_trajectory(pdb_file, 100, engine).run()) can be called as
project.queue(project.new_trajectory(pdb_file, 100, engine))
Trajectories from other trajectories¶
This will be the most common case. At least in any remote kind of adaptivity you will not start always from the same position or extend. You want to pick any exisiting frame and continue from there. So, let’s do that.
First we get a trajectory. Every Bundle
in the project (e.g.
.trajectories
, .models
, .files
, .tasks
) acts like an
enhanced set. You can iterate over all entries as we did before, and you
can get one element, which usually is the first stored, but not always.
If you are interested in Bundle
s see the documentation. For now
that is enough to know, that a bundle (as a set) has a .one
function
which is short for getting the first object if you iterate. As if you
would call next(project.trajectories)
. Note, that the iterator does
not ensure a fixed order. You literally might get any object, if there
is at least one.
In [36]:
trajectory = project.trajectories.one
Good, at least 100 frames. We pick, say, frame at index 28 (which is the
29th frame, we start counting at zero) using the way you pick an element
from a python list (which is almost what a Trajectory
represents, a
list of frames)
In [38]:
frame = trajectory[28]
print frame, frame.exists
Frame(sandbox:///{}/00000000/[28]) False
In [39]:
frame = trajectory[30]
print frame, frame.exists
Frame(sandbox:///{}/00000000/[30]) True
This part is important! We are running only one full atom trajectory with stride larger than one, so if we want to pick a frame from this trajectory you can pick in theory every frame, but only some of these really exist. If you want to restart from a frame this needs to be the case. Otherwise you run into trouble.
To run a trajectory just use the frame as the initial frame.
In [40]:
frame = trajectory[28]
task = project.new_trajectory(frame, 100, engine).run()
print task
None
In [41]:
frame = trajectory[30]
task = project.new_trajectory(frame, 100, engine).run()
print task
<adaptivemd.engine.engine.TrajectoryGenerationTask object at 0x10360f4d0>
In [42]:
print task.description
Task: TrajectoryGenerationTask(OpenMMEngine) [created]
Sources
- staging:///integrator.xml
- staging:///system.xml
- staging:///alanine.pdb
- staging:///openmmrun.py
- sandbox:///{}/00000000/ [exists]
Targets
- sandbox:///{}/00000005/
Modified
<pretask>
Link('staging:///alanine.pdb' > 'worker://initial.pdb)
Link('staging:///system.xml' > 'worker://system.xml)
Link('staging:///integrator.xml' > 'worker://integrator.xml)
Link('staging:///openmmrun.py' > 'worker://openmmrun.py)
Link('sandbox:///{}/00000000/' > 'worker://source/)
mdconvert -o worker://input.pdb -i 3 -t worker://initial.pdb worker://source/master.dcd
Touch('worker://traj/')
python openmmrun.py -r --report-interval 1 -p CPU --types="{'protein':{'stride':1,'selection':'protein','filename':'protein.dcd'},'master':{'stride':10,'selection':null,'filename':'master.dcd'}}" -t worker://input.pdb --length 100 worker://traj/
Move('worker://traj/' > 'sandbox:///{}/00000005/)
<posttask>
See, how the actual frame picked in the mdconvert
line is -i 3
meaning index 3 which represents frame 30 with stride 10.
Now, run the task.
In [43]:
project.queue(task)
Btw, you can wait until something happens using
project.wait_until(condition)
. This is not so useful in notebooks,
but in scripts it does. condition
here is a function that evaluates
to True
or False
. it will be tested in regular intervals and
once it is True
the function returns.
In [44]:
project.wait_until(task.is_done)
In [45]:
task.state
Out[45]:
u'success'
Each Task
has a function is_done
that you can use. It will
return once a task is done. That means it either failed or succeeded or
was cancelled. Basically when it is not queued anymore.
If you want to run adaptively, all you need to do is to figure out where to start new simulations from and use the methods provided to run these.
Model
tasks¶
There are of course other things you can do besides creating new trajectories
In [46]:
from adaptivemd.analysis.pyemma import PyEMMAAnalysis
The instance to compute an MSM model of existing trajectories that you
pass it. It is initialized with a .pdb
file that is used to create
features between the \(c_\alpha\) atoms. This implementaton requires
a PDB but in general this is not necessay. It is specific to my
PyEMMAAnalysis show case.
In [47]:
modeller = PyEMMAAnalysis(
engine=engine,
outtype='protein',
features={'add_inverse_distances': {'select_Backbone': None}}
).named('pyemma')
Again we name it pyemma
for later reference.
The other two option chose which output type from the engine we want to analyse. We chose the protein trajectories since these are faster to load and have better time resolution.
The features dict expresses which features to use. In our case use all inverse distances between backbone c_alpha atoms.
A model generating task work similar to trajectories. You create the
generator with options (so far, this will become more complex in the
future) and then you create a Task
from passing it a list of
trajectories to be analyzed.
In [48]:
task = modeller.execute(list(project.trajectories))
project.queue(task)
In [49]:
project.wait_until(task.is_done)
In [52]:
for m in project.models:
print m
<adaptivemd.model.Model object at 0x1036bff50>
So we generated one model. The Model
objects contain (in the base
version) only a .data
attribute which is a dictionary of information
about the generated model.
In [53]:
model = project.models.last
In [54]:
print model['msm']['P']
[[ 0.84615385 0. 0. 0.07701397 0.07683217]
[ 0. 0.94936708 0.02307278 0.02756013 0. ]
[ 0. 0.02591964 0.94047619 0.00963989 0.02396427]
[ 0.01328607 0.05096999 0.01586998 0.89333333 0.02654062]
[ 0.01433636 0. 0.04267144 0.02870648 0.91428572]]
Pick frames automatically¶
The last thing that is implemented is a function that can utilize models to decide which frames are better to start from. The simplest one will use the counts per state, take the inverse and use this as a distribution.
In [55]:
project.find_ml_next_frame(4)
Out[55]:
[Frame(sandbox:///{}/00000003/[40]),
Frame(sandbox:///{}/00000003/[20]),
Frame(sandbox:///{}/00000005/[50]),
Frame(sandbox:///{}/00000003/[20])]
So you can pick states according to the newest (last) model. (This will be moved to the Brain). And since we want trajectories with these frames as starting points there is also a function for that
In [56]:
trajectories = project.new_ml_trajectory(length=100, number=4, engine=engine)
trajectories
Out[56]:
[Trajectory(Frame(sandbox:///{}/00000000/[10]) >> [0..100]),
Trajectory(Frame(sandbox:///{}/00000003/[50]) >> [0..100]),
Trajectory(Frame(sandbox:///{}/00000000/[10]) >> [0..100]),
Trajectory(Frame(sandbox:///{}/00000003/[20]) >> [0..100])]
Let’s submit these before we finish this notebook with a quick discussion of workers
In [57]:
project.queue(trajectories)
That’s it.
The Worker
objects¶
Worker are the instances that execute tasks for you. If you did not stop the worker in the command line it will still be running and you can check its state
In [59]:
project.trigger()
for w in project.workers:
if w.state == 'running':
print '[%s:%s] %s:%s' % (w.state, DT(w.seen).time, w.hostname, w.cwd)
[running:16:01:47] Stevie.fritz.box:/Users/jan-hendrikprinz/Studium/git/adaptivemd
Okay, the worker is running, was last reporting its heartbeat at ... and
has a hostname and current working directory (where it was executed
from). The generators specify which tasks from some generators are
executed. If it is None
then the worker runs all tasks it finds. You
can use this to run specific workers for models and some for trajectory
generation.
You can also control it remotely by sending it a command. shutdown
will shut it down for you.
In [77]:
# project.workers.last.command = 'shutdown'
Afterwards you need to restart you worker to continue with this examples.
If you want to control Worker
objects look at the documentation.
In [101]:
project.close()
In [ ]: