Source code for adaptivemd.worker

# adaptiveMD: A Python Framework to Run Adaptive Molecular Dynamics (MD)
#             Simulations on HPC Resources
# Copyright 2017 FU Berlin and the Authors
# Authors: Jan-Hendrik Prinz
# Contributors:
# `adaptiveMD` is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# GNU Lesser General Public License for more details.
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <>.

Implementation of a single instance stand-alone worker to execute tasks

The main idea is that you want tasks that you created in your project to be
executed somehow. There can be several ways to do that and this _worker_
approach will allow you to run a single instance worker somewhere on your
HPC which will search your project for unfinished tasks that can be run,
assign one of these tasks to itself and excute it. Once finished will
continue with the next task

The worker consists of two parts:

1. the single worker scheduler that will interprete a task, convert it to
   a bash script and run it
2. the worker job instance that runs a loop in the background, checking the DB
   for new tasks and submitting these to the scheduler for execution


import os
import socket
import subprocess
import time
import sys
import random
import signal
import ctypes
import re
import shutil
from fcntl import fcntl, F_GETFL, F_SETFL

from mongodb import StorableMixin, SyncVariable, create_to_dict, \

from scheduler import Scheduler
from reducer import StrFilterParser, WorkerParser, BashParser, PrefixParser
from logentry import LogEntry
from util import DT
from adaptivemd import Transfer

import pymongo.errors

    # works on linux
    libc = ctypes.CDLL("")
except OSError:
    libc = None

[docs]class WorkerScheduler(Scheduler):
[docs] def __init__(self, resource, verbose=False): """ A single instance worker scheduler to interprete `Task` objects Parameters ---------- resource : `Resource` the resourse this scheduler should use. verbose : bool if True the worker will report lots of stuff """ super(WorkerScheduler, self).__init__(resource) self._current_sub = None self._current_unit_dir = None self.current_task = None self.home_path = os.path.expanduser('~') self._done_tasks = set() self._save_log_to_db = True self.verbose = verbose self._fail_after_each_command = True self._cleanup_successful = True self._std = {}
@property def path(self): return self.resource.shared_path.replace('$HOME', self.home_path) @property def staging_area_location(self): return 'sandbox:///workers/staging_area'
[docs] def task_to_script(self, task): """ Convert a task to an executable bash script Parameters ---------- task : `Task` the `Task` instance to be converted Returns ------- list of str a list of bash commands """ # create a task that wraps errands from resource and scheduler wrapped_task = task >> self.wrapper >> self.project.resource.wrapper # call the reducer that interpretes task actions reducer = StrFilterParser() >> PrefixParser() >> WorkerParser() >> BashParser() script = reducer(self, wrapped_task.script) if self._fail_after_each_command: # the bash script exits if ANY command fails not just the last one script = ['set -e'] + script return script
[docs] def submit(self, submission): """ Submit a `Task` or a `Trajectory` Parameters ---------- submission : (list of) `Task` or `Trajectory` Returns ------- list of `Task` the list of tasks actually executed after looking at all objects """ tasks = self._to_tasks(submission) if tasks: for task in tasks: self.tasks[task.__uuid__] = task return tasks
@property def current_task_dir(self): """ Return the current path to the worker directory Returns ------- str or None the path or None if no task is executed at the time """ if self._current_unit_dir is not None: return self.path + '/workers/' + self._current_unit_dir else: return None def _start_job(self, task): """ Start execution of a task Parameters ---------- task : `Task` the task to be executed """ self._current_unit_dir = 'worker.%s' % hex(task.__uuid__) script_location = self.current_task_dir if os.path.exists(script_location): print 'removing existing folder', script_location # the folder already exists, probably a failed previous attempt # a restart needs a clean folder so remove it now shutil.rmtree(script_location) # create a fresh folder os.makedirs(script_location) # and set the current directory os.chdir(script_location)'submit', self) script = self.task_to_script(task >> self.wrapper >> self.resource.wrapper) # write the script with open(script_location + '/', 'w') as f: f.write('\n'.join(script)) task.state = 'running', self) if libc is not None: def set_pdeathsig(sig=signal.SIGTERM): def death_fnc(): return libc.prctl(1, sig) return death_fnc preexec_fn = set_pdeathsig(signal.SIGTERM) else: preexec_fn = None self._current_sub = subprocess.Popen( ['/bin/bash', script_location + '/'], stdout=subprocess.PIPE, stderr=subprocess.PIPE, preexec_fn=preexec_fn, shell=False) # this is a special hack that allows to read from stdout and stderr # without a blocking `.read`, let's hope this works flags = fcntl(self._current_sub.stdout, F_GETFL) # get current p.stdout flags fcntl(self._current_sub.stdout, F_SETFL, flags | os.O_NONBLOCK) flags = fcntl(self._current_sub.stderr, F_GETFL) # get current p.stderr flags fcntl(self._current_sub.stderr, F_SETFL, flags | os.O_NONBLOCK) # prepare std catching self._start_std()
[docs] def stop_current(self): """ Stop execution of the current task immediately Returns ------- bool if True the current task was cancelled, False if there was no task running """ if self._current_sub is not None: task = self.current_task self._current_sub.kill() del self.tasks[task.__uuid__] self._final_std() self.current_task = None return True else: return False
def _start_std(self): self._std = { 'stdout': '', 'stderr': '' } def _advance_std(self): """ Advance the stdout and stderr for some bytes, save it and redirect """ for s in ['stdout', 'stderr']: try: new_std =, s).fileno(), 1024) self._std[s] += new_std if self.verbose: # send to stdout, stderr std = getattr(sys, s) std.write(new_std) std.flush() except OSError: pass def _final_std(self): """ Finish capturing of stdout and stderr """ task = self.current_task try: out, err = self._current_sub.communicate() if self.verbose: sys.stderr.write(err) sys.stdout.write(out) # save full message stdout = self._std['stdout'] + out stderr = self._std['stderr'] + err if self._save_log_to_db: log_err = LogEntry( 'worker', 'stderr from running task', stderr ) log_out = LogEntry( 'worker', 'stdout from running task', stdout ) self.project.logs.add(log_err) self.project.logs.add(log_out) task.stdout = log_out task.stderr = log_err except ValueError: pass
[docs] def advance(self): """ Advance checking if tasks are completed or failed Needs to be called in regular intervals. Usually by the main worker instance """ if self.current_task is None: if len(self.tasks) > 0: t = next(self.tasks.itervalues()) self.current_task = t self._start_job(t) else: task = self.current_task # get current outputs return_code = self._current_sub.poll() # update current stdout and stderr by 1024 bytes self._advance_std() if return_code is not None: # finish std catching self._final_std() if return_code == 0: # success all_files_present = True # see first if we have all claimed files for worker output staging transfer for f in task.targets: if isinstance(f, Transfer): if not os.path.exists(self.replace_prefix(f.source.url)): log = LogEntry( 'worker', 'execution error', 'failed to create file before staging %s' % f.source.short, objs={'file': f, 'task': task} ) self.project.logs.add(log) all_files_present = False if all_files_present: try:'success', self) task.state = 'success' print 'task succeeded' if self._cleanup_successful: print 'removing worker dir' # go to an existing folder before we delete os.chdir(self.path) script_location = self.current_task_dir if script_location is not None: shutil.rmtree(script_location) except IOError: task.state = 'fail' else: task.state = 'fail' else: # failed log = LogEntry( 'worker', 'task failed', 'see log files', objs={'task': task} ) self.project.logs.add(log) task.state = 'failed' try:'fail', self) except IOError: pass task.state = 'fail' del self.tasks[task.__uuid__] self._done_tasks.add(task.__uuid__) self._initialize_current()
[docs] def release_queued_tasks(self): """ Release captured tasks scheduled for execution (if not started yet) You can prefetch tasks (although not recommended for single workers) and this releases not started jobs back to the queue """ for t in list(self.tasks.values()): if t.state == 'queued': t.state = 'created' t.worker = None del self.tasks[t.__uuid__]
def _initialize_current(self): self._current_sub = None self._current_unit_dir = None self.current_task = None
[docs] def enter(self, project=None): self.change_state('booting') if project is not None: self.project = project # register this cluster with the session for later cleanup self.project.schedulers.add(self) # create main folders. make sure we can save project files self.stage_project() self.stage_generators() self.change_state('running')
[docs] def stage_project(self): """ Create paths necessary for the current project """ paths = [ self.path + '/projects/', self.path + '/projects/' +, self.path + '/projects/' + + '/trajs', self.path + '/projects/' + + '/models' ] self._create_dirs(paths) paths = [ self.path + '/workers', self.path + '/workers/staging_area' # self.path + '/workers/staging_area/trajs' ] self._create_dirs(paths)
@staticmethod def _create_dirs(paths): for p in paths: try: os.makedirs(p) except OSError: pass
[docs] def stage_generators(self): os.chdir(self.path + '/workers/staging_area/') reducer = StrFilterParser() >> PrefixParser() >> WorkerParser() >> BashParser() retries = 10 while retries > 0: try: # todo: add staging that does some file copying as well for g in self.generators: reducer(self, g.stage_in) retries = 0 except OSError: time.sleep(random.random()) retries -= 1
[docs] def replace_prefix(self, path): # on a worker all runs on the remote side, so if we talk about file:// locations # we actually want them to work, once they are transferred. There are only two # ways this is supported (yet). Either the file is in the DB then we do not care # about the file location. The other case is, if the task generates it on the # file side and then transfers it. The trick we use is to just create the file # directly on the remote side and do the link as usual. The requires to alter # a file:// path to be on the remote side. # replace any occurance of `file://a/b/c/d/something` with `worker://_file_something path = re.sub(r"(file://[^ ]*/)([^ /]*)", r"worker://_file_\2", path) # call the default replacements path = super(WorkerScheduler, self).replace_prefix(path) return path
[docs] def shut_down(self, wait_to_finish=True): self.change_state('releaseunfinished') self.release_queued_tasks() if wait_to_finish: self.change_state('waitcurrent') curr = time.time() max_wait = 15 while len(self.tasks) > 0 and time.time() - curr < max_wait: self.advance() time.sleep(2.0) # kill the current job self.change_state('shuttingdown') if self.current_task: if True: self.current_task.state = 'created' else: self.current_task.state = 'cancelled' self.stop_current() self.change_state('down')
[docs]class Worker(StorableMixin): """ A Worker instance the will submit tasks from the DB to a scheduler """ _find_by = ['state', 'n_tasks', 'seen', 'verbose', 'prefetch', 'current'] state = SyncVariable('state') n_tasks = SyncVariable('n_tasks') seen = SyncVariable('seen') verbose = SyncVariable('verbose') prefetch = SyncVariable('prefetch') command = SyncVariable('command') current = ObjectSyncVariable('current', 'tasks')
[docs] def __init__(self, walltime=None, generators=None, sleep=None, heartbeat=None, prefetch=1, verbose=False): super(Worker, self).__init__() self.hostname = socket.gethostname() self.cwd = os.getcwd() self.seen = time.time() self.walltime = walltime self.generators = generators self.sleep = sleep self.heartbeat = heartbeat self.prefetch = prefetch self.reconnect_time = 10 self._scheduler = None self._project = None self.command = None self.verbose = verbose self.current = None self._last_current = None = os.getpid()
to_dict = create_to_dict([ 'walltime', 'generators', 'sleep', 'heartbeat', 'hostname', 'cwd', 'seen', 'prefetch', 'pid' ])
[docs] @classmethod def from_dict(cls, dct): obj = super(Worker, cls).from_dict(dct) obj.hostname = dct['hostname'] obj.cwd = dct['cwd'] obj.seen = dct['seen'] = dct['pid'] return obj
def create(self, project): scheduler = WorkerScheduler(project.resource, self.verbose) scheduler._state_cb = self._state_cb self._scheduler = scheduler self._project = project scheduler.enter(project) def _state_cb(self, scheduler): self.state = scheduler.state @property def scheduler(self): """ Returns ------- `WorkerScheduler` the currently used scheduler to execute tasks """ return self._scheduler @property def project(self): """ Returns ------- `Project` the currently used project """ return self._project _running_states = ['running', 'waitandshutdown'] _accepting_states = ['running'] def _stop_current(self, mode): sc = self.scheduler task = sc.current_task if task: attempt = lambda x: x == task, 'state', 'running', 'stopping') if attempt is not None: if sc.stop_current(): # success, so mark the task as cancelled task.state = mode task.worker = None print 'stopped a task [%s] from generator `%s` and set to `%s`' % ( task.__class__.__name__, if task.generator else '---', task.state) else: # semms in the meantime the task has finished (success/fail) pass
[docs] def execute(self, command): """ Send and execute a single command to the worker Note that the worker is registered on the DB but running on your HPC. Just loading it does not allow you to call functions like `.shutdown`. These would only be called on your local instance. All you can do is use `execute` which will store a command in the DB and once the real running worker executed it. The command will be cleared from the DB. Parameters ---------- command : str the command to be executed """ self.command = command
[docs] def run(self): """ Start the worker to execute tasks until it is shut down """ scheduler = self._scheduler project = self._project last = time.time() last_n_tasks = 0 self.seen = last def task_test(x): return x.ready and (not self.generators or ( hasattr(x.generator, 'name') and in self.generators)) print 'up and running ...' try: reconnect = True while reconnect: reconnect = False try: if len(scheduler.tasks) > 0: # must have been a DB connection problem, attempt reconnection print 'attempt reconnection' self._project.reconnect() print 'remove all pending tasks' # remove all pending tasks as much as possible for t in list(scheduler.tasks.values()): if t is not scheduler.current_task: if t.worker == self: t.state = 'created' t.worker = None del scheduler.tasks[t.__uuid__] # see, if we can salvage the currently running task # unless it has been cancelled and is running with another worker t = scheduler.current_task if t.worker == self and t.state == 'running': print 'continuing current task' # seems like the task is still ours to finish pass else: print 'current task has been captured. releasing.' scheduler.stop_current() # the main worker loop while scheduler.state != 'down': state = self.state # check the state of the worker if state in self._running_states: scheduler.advance() if scheduler.is_idle: for _ in range(self.prefetch): tasklist = scheduler( task_test, 'state', 'created', 'queued')) for task in tasklist: task.worker = self print 'queued a task [%s] from generator `%s`' % ( task.__class__.__name__, if task.generator else '---') self.n_tasks = len(scheduler.tasks) # handle commands # todo: Place all commands in a separate store and consume ?!? command = self.command if command == 'shutdown': # someone wants us to shutdown scheduler.shut_down() if command == 'kill': # someone wants us to shutdown immediately. No waiting scheduler.shut_down(False) elif command == 'release': scheduler.release_queued_tasks() elif command == 'halt': self._stop_current('halted') elif command == 'cancel': self._stop_current('cancelled') elif command and command.startswith('!'): result = subprocess.check_output(command[1:].split(' ')) project.logs.add( LogEntry( 'command', 'called `%s` on worker' % command[1:], result ) ) if command: self.command = None if time.time() - last > self.heartbeat: # heartbeat last = time.time() self.seen = last time.sleep(self.sleep) if self.walltime and time.time() - self.__time__ > self.walltime: # we have reached the set walltime and will shutdown print 'hit walltime of %s' % DT(self.walltime).length scheduler.shut_down() if scheduler.current_task is not self._last_current: self.current = scheduler.current_task self._last_current = self.current n_tasks = len(scheduler.tasks) if n_tasks != last_n_tasks: self.n_tasks = n_tasks last_n_tasks = n_tasks except (pymongo.errors.ConnectionFailure, pymongo.errors.AutoReconnect) as e: print 'pymongo connection error', e print 'try reconnection after %d seconds' % self.reconnect_time # lost connection to DB, try to reconnect after some time time.sleep(self.reconnect_time) reconnect = True except KeyboardInterrupt: scheduler.shut_down() pass
[docs] def shutdown(self, gracefully=True): """ Shut down the worker Parameters ---------- gracefully : bool if True the worker is allowed some time to finish running tasks """ self._scheduler.shut_down(gracefully)