##############################################################################
# adaptiveMD: A Python Framework to Run Adaptive Molecular Dynamics (MD)
# Simulations on HPC Resources
# Copyright 2017 FU Berlin and the Authors
#
# Authors: Jan-Hendrik Prinz
# Contributors:
#
# `adaptiveMD` is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################
from file import AddPathAction, FileAction, FileTransaction, MakeDir, \
Copy, Transfer, Link, Move, Remove, Touch
import os
[docs]class ActionParser(object):
"""
A class that can interprete actions into scheduler understandable commands
"""
[docs] def __init__(self):
self.parent = None
self.scheduler = None
[docs] def parse(self, scheduler, action):
"""
Parse a single action
Parameters
----------
scheduler : `Scheduler`
the used scheduler which knows about specifics in the
parsing process, like, e.g., file paths
action : `Action` or dict or list of str
the actual action to be parsed
Returns
-------
list of `Action` or dict or str
"""
return action
def __call__(self, scheduler, actions):
return self._f([self.parse(scheduler, x) for x in actions])
def __rshift__(self, other):
return ChainedParser(self, other)
@staticmethod
def _f(actions):
"""
Flatten lists
Returns
-------
list of str or dict or `Action`
"""
return filter(bool, sum([x if isinstance(x, list) else [x] for x in actions], []))
[docs]class DictFilterParser(ActionParser):
[docs] def parse(self, scheduler, action):
if isinstance(action, dict):
return action
return None
[docs]class StrFilterParser(ActionParser):
[docs] def parse(self, scheduler, action):
if isinstance(action, basestring):
return action
return None
[docs]class ChainedParser(ActionParser):
"""
Parser that represents the chained application of two parser
"""
[docs] def __init__(self, parent, child):
super(ChainedParser, self).__init__()
self.parent = parent
self.child = child
def __call__(self, scheduler, actions):
return self.parent(scheduler, self.child(scheduler, actions))
[docs]class StageInParser(ActionParser):
"""
Special parser that can interpret actions into RP stage-in phase
"""
[docs] def parse(self, scheduler, action):
if isinstance(action, FileTransaction):
source = action.source
target = action.target
sp = source.url
tp = target.url
ret = {
'source': sp,
'target': tp,
'action': 'Transfer' # rp.TRANSFER
}
return ret
return action
[docs]class BashParser(ActionParser):
[docs] def parse(self, scheduler, action):
if isinstance(action, FileAction):
sp = action.source.url
sd = sp.split('://')[0]
if sd == 'worker':
sp = sp.split('://')[1]
if isinstance(action, Transfer):
if sd == 'file':
sp = sp.split('://')[1]
if isinstance(action, Remove):
return ['rm %s %s' % (
'-r' if action.source.is_folder else '', sp)]
elif isinstance(action, Touch):
if action.source.is_folder:
return ['mkdir -p %s' % sp]
else:
return ['touch %s' % sp]
elif isinstance(action, MakeDir):
if action.source.is_folder:
return ['mkdir -p %s' % sp]
elif isinstance(action, FileTransaction):
if action.target.is_folder == action.source.is_folder:
# file to file and folder to folder
rules = stage_rules[action.__class__]
if rules['bash_cmd']:
tp = action.target.url
td = action.target.drive
if isinstance(action, Move) and action.source.is_folder:
# we cannot just replace an existing folder using `mv`
# easiest way is to just move all source files
# this will create `mv source/* target/ and mv all files in
# source to target and overwrite the targets as we expect
return [
'mkdir -p %s' % tp, # create target dir if not exist
'mv %s* %s' % (sp, tp), # move all files
'rm -r %s' % sp] # remove source dir
else:
if isinstance(action, Transfer):
if td == 'file':
tp = tp.split('://')[1]
if isinstance(action, Link):
# links must not end in `/`
if action.target.is_folder:
tp = tp[:-1]
sp = sp[:-1]
return ['%s %s %s' % (rules['bash_cmd'], sp, tp)]
else:
if isinstance(action, AddPathAction):
return ['export PATH=%s:$PATH' % action.path]
return action
[docs]class StageParser(ActionParser):
"""
Parse into possible RP Stage commands for ComputeUnits
"""
[docs] def parse(self, scheduler, action):
sa_location = scheduler.staging_area_location
if isinstance(action, FileAction):
sp = action.source.url
# useful for RP only
if sp.startswith(sa_location):
sp = 'staging://' + sp.split(sa_location)[1]
sd = sp.split('://')[0]
if sd == 'worker':
sp = sp.split('://')[1]
if isinstance(action, Transfer):
if sd == 'file':
sp = sp.split('://')[1]
if isinstance(action, FileTransaction):
tp = action.target.url
td = action.target.drive
if td == 'worker':
tp = tp.split('://')[1]
if isinstance(action, Transfer):
if td == 'file':
tp = tp.split('://')[1]
rules = stage_rules[action.__class__]
signature = (sd, td)
action_models = rules['folder' if action.source.is_folder else 'file']
action_mode = action_models.get(signature)
if action_mode == 'stage':
ret = {
'source': sp,
'target': tp,
'action': rules['rp_action']
}
return ret
return action
[docs]class WorkerParser(ActionParser):
"""
A parser that can interprete transactions from/to ``file://`` for workers
This will write the files to the target location instead of a real
transaction. It requires the file to be stored in the DB using ``load()``
"""
[docs] def parse(self, scheduler, action):
# all of this is to keep RP compatibility which works with files
if isinstance(action, FileTransaction):
source = action.source
target = action.target
# create file from
sp = scheduler.replace_prefix(source.url)
tp = scheduler.replace_prefix(target.url)
if source.drive == 'file' and target.drive != 'file':
if source.has_file:
with open(tp, 'w') as f:
f.write(source.get_file())
return ['# write file `%s` from DB' % tp]
elif os.path.exists(sp):
# in case someone already created the file we need, rename it
if sp != tp:
return ['ln %s %s' % (sp, tp)]
else:
return ['# Could not write or rename file', action]
elif target.drive == 'file' and source.drive != 'file':
return ['ln -s %s %s' % (sp, tp)]
return action
[docs]class PrefixParser(ActionParser):
"""
Replace all adaptiveMD prefixes
Usually the last step to convert all file paths
"""
[docs] def parse(self, scheduler, action):
if isinstance(action, basestring):
# a bash command, look for prefixes to be parsed
return [scheduler.replace_prefix(action)]
return action
# a list of RP implementations and bash commands for each `Action`
stage_rules = {
Copy: {
'file': {
('staging', 'worker'): 'stage',
('worker', 'staging'): 'stage',
('sandbox', 'worker'): 'bash',
('shared', 'worker'): 'bash',
('worker', 'shared'): 'bash',
('shared', 'shared'): 'bash',
('shared', 'staging'): 'bash',
('staging', 'shared'): 'bash'
},
'folder': {
('staging', 'worker'): 'bash',
('worker', 'staging'): 'bash',
('sandbox', 'worker'): 'bash',
('shared', 'worker'): 'bash',
('worker', 'shared'): 'bash',
('shared', 'shared'): 'bash',
('shared', 'staging'): 'bash',
('staging', 'shared'): 'bash'
},
'bash_cmd': 'cp',
'rp_action': 'Copy' # rp.COPY
},
Transfer: {
'file': {
('file', 'worker'): 'stage',
('file', 'staging'): 'stage',
('staging', 'worker'): 'stage',
('staging', 'file'): 'stage',
('worker', 'staging'): 'stage',
('worker', 'file'): 'stage'
},
'folder': {
},
'bash_cmd': None,
'rp_action': 'Transfer' # rp.TRANSFER
},
Move: {
'file': {
('staging', 'worker'): 'stage',
('worker', 'staging'): 'stage',
('sandbox', 'worker'): 'bash',
('shared', 'worker'): 'bash',
('worker', 'shared'): 'bash',
('shared', 'shared'): 'bash',
('shared', 'staging'): 'bash',
('staging', 'shared'): 'bash'
},
'folder': {
('staging', 'worker'): 'bash',
('worker', 'staging'): 'bash',
('sandbox', 'worker'): 'bash',
('shared', 'worker'): 'bash',
('worker', 'shared'): 'bash',
('shared', 'shared'): 'bash',
('shared', 'staging'): 'bash',
('staging', 'shared'): 'bash'
},
'bash_cmd': 'mv',
'rp_action': 'Move' # rp.MOVE
},
Link: {
'file': {
('staging', 'worker'): 'stage',
('sandbox', 'worker'): 'bash',
('shared', 'worker'): 'bash'
},
'folder': {
('staging', 'worker'): 'bash',
('sandbox', 'worker'): 'bash',
('shared', 'worker'): 'bash'
},
'bash_cmd': 'ln -s',
'rp_action': 'Link' # rp.LINK
}
}