Source code for adaptivemd.task

##############################################################################
# adaptiveMD: A Python Framework to Run Adaptive Molecular Dynamics (MD)
#             Simulations on HPC Resources
# Copyright 2017 FU Berlin and the Authors
#
# Authors: Jan-Hendrik Prinz
# Contributors:
#
# `adaptiveMD` is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################


import os

from file import File, JSONFile, FileTransaction
from util import get_function_source
from mongodb import StorableMixin, SyncVariable, ObjectSyncVariable


class BaseTask(StorableMixin):
    _copy_attributes = [
        '_main', '_add_paths', '_environment'
        ]

    def __init__(self):
        super(BaseTask, self).__init__()

        self._main = []

        self._add_paths = []
        self._environment = {}

    @staticmethod
    def _format_export_paths(paths):
        paths = sorted(list(set(paths)))
        return map('export PATH={}:$PATH'.format, paths)

    @staticmethod
    def _format_environment(env):
        if env:
            envs = [(key, env[key]) for key in sorted(list(env))]
            return ['export {0}={1}'.format(key, value) for key, value in envs]
        else:
            return []

    @property
    def pre_add_paths(self):
        """
        list of str
            the list of added paths to the $PATH variable by this task

        """
        return self._add_paths

    @property
    def environment(self):
        """
        dict str : str
            the dict of environment variables and their assigned value

        """
        return self._environment

    @property
    def pre_exec(self):
        """
        list of str or `Action`
            the list of actions to be run before the main script. Contains environment variables

        """
        return (
            self._format_export_paths(self.pre_add_paths) +
            self._format_environment(self.environment))

    @property
    def main(self):
        """
        list of str or `Action`
            the main part of the script

        """
        return (
            self._main
        )

    @property
    def script(self):
        """
        list of str or `Action`
            the full script of this task. This is what is send to a worker and parsed by it

        """
        return self.pre_exec + self.main

    def add_path(self, path):
        """

        Parameters
        ----------
        path : (list of) str
            a (list of) path(s) to be added to the $PATH variable before task execution

        """
        if isinstance(path, str):
            self._add_paths.append(path)
        elif isinstance(path, (list, tuple)):
            self._add_paths.extend(path)

    def __rshift__(self, other):
        """
        The `>>` can be used to wrap a task in one another.

        The outer task must have pre and post and the inner will use the full script

        Parameters
        ----------
        other : `PrePostTask`
            the task that wraps the current task

        Returns
        -------
        `EnclosedTask`
            the representation of a wrapped task

        """
        if other is None:
            return self
        elif isinstance(other, PrePostTask):
            return EnclosedTask(self, other)

    def to_dict(self):
        dct = {c: getattr(self, c) for c in self._copy_attributes}
        return dct

    @classmethod
    def from_dict(cls, dct):
        task = cls()

        for c in cls._copy_attributes:
            setattr(task, c, dct.get(c))

        return task


[docs]class Task(BaseTask): """ A description for a task running on an HPC Attributes ---------- worker : :class:`~adaptivemd.worker.WorkingScheduler` the currently assigned Worker instance (not the scheduler!) generator : :class:`~adaptivemd.generator.TaskGenerator` if given the :class:`~adaptivemd.generator.TaskGenerator` that was used to create this task state : str a string representing the current state of the execution. One of - 'create' : task has been created and is available for execution - 'running': task is currently executed by a scheduler - 'queued' : task has been captured by a worker for execution - 'fail' : task has completed but failed. You can restart it - 'succedd` : task has completed and succeeded. - 'halt' : task has been halted by user. You can restart it - 'cancelled' : task has been cancelled by user. You CANNOT restart it stdout : :class:`~adaptivemd.logentry.LogEntry` After completion you can access the stdout of the task here stderr : :class:`~adaptivemd.logentry.LogEntry` After completion you can access the stderr of the task here """ _events = ['submit', 'fail', 'success', 'change'] _copy_attributes = BaseTask._copy_attributes + [ 'stdout', 'stderr', 'restartable', 'cleanup', 'generator', 'dependencies', 'state', 'worker' ] _find_by = ['state', 'worker', 'stderr', 'stdout'] state = SyncVariable('state', lambda x: x in ['success', 'cancelled']) worker = ObjectSyncVariable('worker', 'workers') stdout = ObjectSyncVariable('stdout', 'logs', lambda x: x is not None) stderr = ObjectSyncVariable('stderr', 'logs', lambda x: x is not None) FINAL_STATES = ['success', 'cancelled'] RESTARTABLE_STATES = ['fail', 'halted'] RUNNABLE_STATES = ['created']
[docs] def __init__(self, generator=None): super(Task, self).__init__() self.generator = generator self.dependencies = None self._on = {} self._add_files = [] self.stdout = None self.stderr = None self.restartable = None self.cleanup = None self.restart_failed = False self.add_cb('fail', self.__class__._default_fail) self.add_cb('success', self.__class__._default_success) self.state = 'created' self.worker = None
[docs] def restart(self): """ Mark a task as being runnable if it was stopped or failed before Returns ------- """ state = self.state if state in Task.RESTARTABLE_STATES: self.state = 'created' return True return False
[docs] def cancel(self): """ Mark a task as cancelled if it it not running or has been halted Returns ------- """ state = self.state if state in ['halted', 'created']: self.state = 'cancelled' return True return False
@property def dependency_okay(self): """ Check if all dependency tasks are successful Returns ------- bool True if all dependencies are fulfilled """ dependencies = self.dependencies if dependencies is not None: return all(d.state == 'success' for d in self.dependencies) return True @property def ready(self): """ Check if this task is ready to be executed Usually this only checks dependencies but might involve more elaborate checks for specific Task classes Returns ------- bool if True the task can now be executed """ if self.dependencies: return self.dependency_okay return True def _default_fail(self, scheduler): """ the default function executed when a task fails You can add your own callbacks. This is just the default Parameters ---------- scheduler : `Scheduler` the calling scheduler to know where the task has failed """ # todo: improve error handling print 'task did not complete' if hasattr(scheduler, 'units'): unit = scheduler.units.get(self) if unit is not None: print "* %s state %s (%s), out/err: %s / %s" \ % (unit.uid, unit.state, unit.exit_code, unit.stdout, unit.stderr) def _default_success(self, scheduler): """ the default function executed when a task succeeds You can add your own callbacks. This is just the default Parameters ---------- scheduler : `Scheduler` the calling scheduler to know where the task has succeeded """ for f in self.modified_files: f.modified() scheduler.project.files.add(f) for f in self.targets: f.create(scheduler) scheduler.project.files.add(f) @property def description(self): """ Return a lengthy description of the task for debugging and information Returns ------- str the information text """ task = self s = ['Task: %s(%s) [%s]' % ( task.__class__.__name__, task.generator.__class__.__name__, task.state)] if task.worker: s += ['Worker: %s:%s' % (task.worker.hostname, task.worker.cwd)] s += [' cd worker.%s' % hex(task.__uuid__)] s += [''] s += ['Sources'] s += ['- %s %s' % (x.short, '[exists]' if x.exists else '') for x in task.unstaged_input_files] s += ['Targets'] s += ['- %s' % x.short for x in task.targets] s += ['Modified'] s += ['- %s' % x.short for x in task.modified_files] s += [''] s += ['<pretask>'] s += map(str, task.script) s += ['<posttask>'] return '\n'.join(s)
[docs] def fire(self, event, scheduler): """ Fire an event like success or failed. Notes ----- You should never have to call this yourself. The scheduler does that. Parameters ---------- event : str the events name like `fail`, `success`, `submit` scheduler : `Scheduler` the scheduler that issued the events to be fired """ if event in Task._events: cbs = self._on.get(event, []) for cb in cbs: cb(self, scheduler) if event in ['submit', 'fail', 'success']: self.state = event
[docs] def is_done(self): """ Check if the task is done executing. Can be failed, successful or cancelled Returns ------- bool True if the task has finished its execution """ return self.state in ['fail', 'success', 'cancelled']
[docs] def was_successful(self): """ Check if the task is done executing and was successful Returns ------- bool True if the task has finished successfully """ return self.state in ['success']
[docs] def has_failed(self): """ Check if the task is done executing and has failed Returns ------- bool True if the task has finished but failed """ return self.state in ['fail']
[docs] def add_cb(self, event, cb): """ Add a custom callback Parameters ---------- event : str name of the event to be called upon firing cb : function the function to be called. It must be a function that takes a task and a scheduler """ if event in Task._events: self._on[event] = self._on.get(event, []) self._on[event].append(cb)
@property def additional_files(self): """ list of `Location` return the list of files created other than taken care of by actions. Should usually not be necessary. If you do some bad hacks with the bash you can add files that you transferred yourself to the project folders. """ return self._add_files
[docs] def add_files(self, files): """ Add additional files to the task execution Should usually not be necessary. If you do some bad hacks with the bash you can add files that you transferred yourself to the project folders. Parameters ---------- files : list of `File` the list of files to be added to the task """ if isinstance(files, File): self._add_files.append(files) elif isinstance(files, (list, tuple)): self._add_files += files
@property def targets(self): """ Return a set of all new and overwritten files Returns ------- set of `File` the list of files that are created or overwritten by this task """ transactions = [t for t in self.script if isinstance(t, FileTransaction)] return filter( lambda x: not x.is_temp, set(sum(filter(bool, [f.added for f in transactions]), []) + self._add_files)) @property def target_locations(self): """ Return a set of all new and overwritten file urls Returns ------- set of str the list of file urls that are created or overwritten by this task """ return {x.url for x in self.targets} @property def sources(self): """ Return a set of all required input files Returns ------- set of `File` the list of files that are required by this task """ transactions = [t for t in self.script if isinstance(t, FileTransaction)] return filter( lambda x: not x.is_temp, set(sum(filter(bool, [t.required for t in transactions]), []) + self._add_files)) @property def source_locations(self): """ Return a set of all required file urls Returns ------- set of str the list of file urls that are required by this task """ return {x.url for x in self.sources} @property def new_files(self): """ Return a set of all files the will be newly created by this task Returns ------- set of `File` the set of files that are created by this task """ outs = self.targets in_names = self.source_locations return {x for x in outs if x.url not in in_names} @property def modified_files(self): """ A set of all input files whose names match output names and hence will be overwritten Returns ------- list of `File` the list of potentially overwritten input files """ ins = self.sources out_names = self.target_locations return {x for x in ins if x.url in out_names} @property def staged_files(self): """ Set of all staged files by the tasks generator Returns ------- set of `File` files that are staged by the tasks generator Notes ----- There might be more files stages by other generators """ if self.generator is not None: return set(sum(filter(bool, [t.required for t in self.generator.stage_in]), [])) else: return {} @property def unstaged_input_files(self): """ Return a set of `File` objects that are used but are not part of the generator stage Usually a task requires some reused files from staging and specific others. This function lists all the files that this task will stage to its working directory but will not be available from the set of staged files of the tasks generator Returns ------- set of `File` the set of `File` objects that are needed and not staged """ staged = self.staged_files reqs = self.sources return {r for r in reqs if r.url not in staged}
[docs] def setenv(self, key, value): """ Set an environment variable for the task Parameters ---------- key : str value : str """ if self._environment is None: self._environment = {key: value} elif key not in self._environment: self._environment[key] = value else: raise ValueError( 'Cannot set same env variable `%s` more than once.' % key)
[docs] def append(self, cmd): """ Append a command to this task Returns ------- """ self._main.append(cmd)
[docs] def prepend(self, cmd): """ Append a command to this task Returns ------- """ self._main.insert(0, cmd)
[docs] def get(self, f, name=None): """ Get a file and make it available to the task in the main directory Parameters ---------- f : `File` name : `Location` or str Returns ------- `File` the file instance of the file to be created in the unit """ if f.drive in ['staging', 'sandbox', 'shared']: transaction = f.link(name) elif f.drive == 'file': transaction = f.transfer(name) elif f.drive == 'worker': if name is None: return f else: transaction = f.copy(name) else: raise ValueError( 'Weird file location `%s` not sure how to get it.' % f.location) self.append(transaction) assert isinstance(transaction, FileTransaction) return transaction.target
[docs] def touch(self, f): """ Add an action to create an empty file or folder at a given location Parameters ---------- f : `Location` the location (file or folder) to be used """ transaction = f.touch() self.append(transaction) return transaction.source
[docs] def put(self, f, target): """ Put a file back and make it persistent Corresponds to output_staging Parameters ---------- f : `File` the file to be used target : str or `File` the target location. Need to contain a URL like `staging://` or `file://` for application side files Returns ------- `Location` the actual target location """ transaction = f.move(target) self.append(transaction) return transaction.target
[docs] def remove(self, f): """ Add an action to remove a file or folder Parameters ---------- f : `File` the location to be removed Returns ------- `Location` the actual location """ transaction = f.remove() self.append(transaction) return transaction.source
[docs] def add_conda_env(self, name): """ Add loading a conda env to all tasks of this resource This calls `resource.wrapper.append('source activate {name}')` Parameters ---------- name : str name of the conda environment """ self.append('source activate %s' % name)
class PrePostTask(Task): """ Special task where the script is devided into Pre/Main/Post Attributes ---------- pre : list the pre part of the script. Attach actions with `.append` post : list the post part of the script. Attach actions with `.append` """ _copy_attributes = Task._copy_attributes + [ 'pre', 'post' ] def __init__(self, generator=None): super(PrePostTask, self).__init__(generator) self.pre = [] self.post = [] @property def pre_exec(self): return ( self._format_export_paths(self.pre_add_paths) + self._format_environment(self.environment)) @property def main(self): return self.pre + self._main + self.post class MPITask(PrePostTask): """ A description for a task running on an HPC with MPI (used for RP) """ _copy_attributes = PrePostTask._copy_attributes + [ 'executable', 'arguments', 'cores', 'mpi', 'kernel', 'name' ] def __init__(self, generator=None): super(MPITask, self).__init__(generator) self.executable = None self.arguments = None self.cores = 1 self.mpi = False self.kernel = None self.name = None @property def command(self): cmd = self.executable or '' if isinstance(self.arguments, basestring): cmd += ' ' + self.arguments elif self.arguments is not None: cmd += ' ' args = [ a if (a[0] in ['"', "'"] and a[0] == a[-1]) else '"' + a + '"' for a in self.arguments] cmd += ' '.join(args) return cmd def call(self, command, *args, **kwargs): parts = command.split(' ') parts = [part.format(*args, **kwargs) for part in parts] self.executable = parts[0] self.arguments = parts[1:] @property def main(self): return self.pre + [self.command] + self.post def append(self, cmd): raise RuntimeWarning( 'append does nothing for MPITasks. Use .pre.append or .post.append') def prepend(self, cmd): raise RuntimeWarning( 'prepend does nothing for MPITasks. Use .pre.prepend or .post.prepend') class DummyTask(PrePostTask): """ A Task not to be executed. Only to be wrapped around other tasks """ def __init__(self): super(DummyTask, self).__init__() self.state = 'dummy' @property def description(self): task = self s = ['Task: %s' % task.__class__.__name__] s += ['<pre>'] s += map(str, task.pre_exec + task.pre) s += ['</pre>'] s += ['<main />'] s += ['<post>'] s += map(str, task.post) s += ['</post>'] return '\n'.join(s) class EnclosedTask(Task): """ Helper class to wrap any task with a PrePostTask """ _copies = [ 'environment', 'stdout', 'stderr', 'restartable', 'cleanup'] def __init__(self, task, wrapper): super(Task, self).__init__() self._task = task self._wrapper = wrapper def __getattr__(self, item): if item in self._copies: return getattr(self._task, item) else: return getattr(self._wrapper, item) def to_dict(self): return { 'task': self._task, 'wrapper': self._wrapper } @property def environment(self): env = {} if self._wrapper.environment: env.update(self._wrapper.environment) if self._task.environment: env.update(self._task.environment) return env @property def pre_add_paths(self): return self._wrapper.pre_add_paths + self._task.pre_add_paths @classmethod def from_dict(cls, dct): return cls(dct['task'], dct['wrapper']) @property def main(self): return self._wrapper.pre + self._task.main + self._wrapper.post
[docs]class PythonTask(PrePostTask): """ A special task that does a RPC python calls Attributes ---------- then_func_name : str or None the name of the function of the `TaskGenerator` to be called with the resulting output store_output : bool if True then the result from the RPC called function will also be stored in the database. It can later be retrieved using the `.output` attribute on the task completed successfully """ _copy_attributes = PrePostTask._copy_attributes + [ '_python_import', '_python_source_files', '_python_function_name', '_python_args', '_python_kwargs', '_rpc_input_file', '_rpc_output_file', 'then_func_name', 'store_output'] then_func = None
[docs] def __init__(self, generator=None): super(PythonTask, self).__init__(generator) self._python_import = None self._python_source_files = None self._python_function_name = None self._python_args = None self._python_kwargs = None # self.executable = 'python' # self.arguments = '_run_.py' self.then_func_name = 'then_func' self._rpc_input_file = \ JSONFile('file://_rpc_input_%s.json' % hex(self.__uuid__)) self._rpc_output_file = \ JSONFile('file://_rpc_output_%s.json' % hex(self.__uuid__)) # input args -> input.json self.pre.append(self._rpc_input_file.transfer('input.json')) # output args -> output.json self.post.append(File('output.json').transfer(self._rpc_output_file)) f = File('staging:///_run_.py') self.pre.append(f.link()) self.add_cb('success', self.__class__._cb_success) self.add_cb('submit', self.__class__._cb_submit) # if True the RPC result will be stored in the DB with the task self.store_output = True
[docs] def backup_output_json(self, target): """ Add an action that will copy the resulting JSON file to the given path Parameters ---------- target : `Location` the place to copy the resulting `output.json` file to """ self.post.append(File('output.json').copy(target))
def _cb_success(self, scheduler): # here is the logic to retrieve the result object # the output file is a JSON and these know how to load itself if self.store_output: # by default store the result. If you handle it yourself you # might want to turn it off to not save the data twice self._rpc_output_file.load(scheduler) filename = scheduler.get_path(self._rpc_output_file) data = self._rpc_output_file.get(scheduler) if self.generator is not None and hasattr(self.generator, self.then_func_name): getattr(self.generator, self.then_func_name)( scheduler.project, self, data, self._python_kwargs) # cleanup # mark as changed / deleted os.remove(filename) self._rpc_output_file.modified() os.remove(scheduler.get_path(self._rpc_input_file)) self._rpc_input_file.modified() def _cb_submit(self, scheduler): filename = scheduler.replace_prefix(self._rpc_input_file.url) with open(filename, 'w') as f: f.write(scheduler.simplifier.to_json(self._get_json(scheduler))) @property def output(self): """ Return the data contained in the output file Returns ------- object """ return self._rpc_output_file.data
[docs] def then(self, func_name): """ Set the name of the function to be called from the generator after success Parameters ---------- func_name : str the function name to be called after success """ self.then_func_name = func_name
[docs] def call(self, command, **kwargs): """ Set the python function to be called with its arguments Parameters ---------- command : function a python function defined inside a package or a function. If in a package then the package needs to be installed on the cluster to be called. A function defined in a local file can be called as long as dependencies are installed. kwargs : ``**kwargs`` named arguments to the function """ self._python_function_name = '.'.join([command.__module__, command.func_name]) self._python_kwargs = kwargs self._python_import, self._python_source_files = \ get_function_source(command) for f in self._python_source_files: self.pre.append(File('file://' + f).load().transfer()) # call the helper script to execute the function call self.append('python _run_.py')
def _get_json(self, scheduler): dct = { 'import': self._python_import, 'function': self._python_function_name, 'kwargs': self._python_kwargs } return scheduler.flatten_location(dct)