##############################################################################
# adaptiveMD: A Python Framework to Run Adaptive Molecular Dynamics (MD)
# Simulations on HPC Resources
# Copyright 2017 FU Berlin and the Authors
#
# Authors: Jan-Hendrik Prinz
# Contributors:
#
# `adaptiveMD` is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as
# published by the Free Software Foundation, either version 2.1
# of the License, or (at your option) any later version.
#
# This library is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public
# License along with MDTraj. If not, see <http://www.gnu.org/licenses/>.
##############################################################################
# part of the code below was taken from `openpathsampling` see
# <http://www.openpathsampling.org> or
# <http://github.com/openpathsampling/openpathsampling
# for details and license
import base64
import importlib
import numpy as np
import math
import abc
from uuid import UUID
import ujson
import marshal
import types
import opcode
import __builtin__
from base import StorableMixin
__author__ = 'Jan-Hendrik Prinz'
[docs]class ObjectJSON(object):
"""
A simple implementation of a pickle algorithm to create object that can be
converted to json and back
"""
allow_marshal = True
# switch to `true`, if you want more protection
prevent_unsafe_modules = False
allowed_storable_atomic_types = [
int, float, bool, long, str,
np.float32, np.float64,
np.int8, np.int16, np.int32, np.int64,
np.uint8, np.uint16, np.uint32, np.uint64,
]
safe_modules = [
'numpy',
'math',
'pandas',
'mdtraj',
'simtk',
'simtk.unit',
'simtk.openmm'
]
def __init__(self, unit_system=None):
self.excluded_keys = []
self.unit_system = unit_system
self.class_list = dict()
self.allowed_storable_types = dict()
self.type_names = {}
self.type_classes = {}
self.update_class_list()
[docs] def update_class_list(self):
self.class_list = StorableMixin.objects()
self.type_names = {
cls.__name__: cls for cls in self.allowed_storable_atomic_types}
self.type_names.update(self.class_list)
self.type_classes = {
cls: name for name, cls in self.type_names.iteritems()}
[docs] def simplify_object(self, obj):
return {
'_cls': obj.__class__.__name__,
'_dict': self.simplify(obj.to_dict(), obj.base_cls_name)
}
[docs] def simplify(self, obj, base_type=''):
if obj.__class__.__name__ == 'module':
# store an imported module
if obj.__name__.split('.')[0] in self.safe_modules:
return {'_import': obj.__name__}
else:
raise RuntimeError((
'The module reference "%s" you want to store is '
'not allowed!') % obj.__name__)
elif type(obj) is type or type(obj) is abc.ABCMeta:
# store a storable number type
if obj in self.type_classes:
return {'_type': obj.__name__}
else:
return None
elif type(obj) is float and math.isinf(obj):
return {
'_float': str(obj)}
elif type(obj) is int and math.isinf(obj):
return {
'_integer': str(obj)}
elif obj.__class__.__module__ != '__builtin__':
# if obj.__class__ is units.Quantity:
# # This is number with a unit so turn it into a list
# if self.unit_system is not None:
# return {
# '_value': self.simplify(
# obj.value_in_unit_system(self.unit_system)),
# '_units': self.unit_to_dict(
# obj.unit.in_unit_system(self.unit_system))
# }
# else:
# return {
# '_value': self.simplify(obj / obj.unit, base_type),
# '_units': self.unit_to_dict(obj.unit)
# }
if obj.__class__ is np.ndarray:
# this is maybe not the best way to store large numpy arrays!
return {
'_numpy': self.simplify(obj.shape),
'_dtype': str(obj.dtype),
'_data': base64.b64encode(obj.copy(order='C'))
}
elif hasattr(obj, 'to_dict'):
# the object knows how to dismantle itself into a json string
if hasattr(obj, '__uuid__'):
return {
'_cls': obj.__class__.__name__,
'_obj_uuid': str(UUID(int=obj.__uuid__)),
'_dict': self.simplify(obj.to_dict(), base_type)}
else:
return {
'_cls': obj.__class__.__name__,
'_dict': self.simplify(obj.to_dict(), base_type)}
elif type(obj) is UUID:
return {
'_uuid': str(UUID(int=obj))}
# we will convert numpy scalars to python scalar (and cross fingers)
elif isinstance(obj, np.bool_):
return bool(obj)
elif isinstance(obj, np.int_):
return int(obj)
elif isinstance(obj, np.float_):
# todo: this might be dangerous - potential loss of accuracy
return float(obj)
else:
return None
elif type(obj) is list:
return [self.simplify(o, base_type) for o in obj]
elif type(obj) is tuple:
return {'_tuple': [self.simplify(o, base_type) for o in obj]}
elif type(obj) is dict:
# we want to support storable objects as keys so we need to wrap
# dicts with care and store them using tuples
simple = [
key for key in obj.keys()
if type(key) is str or type(key) is int]
if len(simple) < len(obj):
# other keys than int or str
result = {
'_dict': [
self.simplify(tuple([key, o]))
for key, o in obj.iteritems()
if key not in self.excluded_keys
]}
else:
result = {
key: self.simplify(o) for key, o in obj.iteritems()
if key not in self.excluded_keys
}
return result
elif type(obj) is slice:
return {
'_slice': [obj.start, obj.stop, obj.step]}
else:
oo = obj
return oo
@staticmethod
def _unicode2str(s):
if type(s) is unicode:
return s.encode('utf8')
else:
return s
[docs] def build(self, obj):
if type(obj) is dict:
# if '_units' in obj and '_value' in obj:
# return self.build(
# obj['_value']) * self.unit_from_dict(obj['_units'])
if '_slice' in obj:
return slice(*obj['_slice'])
elif '_numpy' in obj:
return np.frombuffer(
base64.decodestring(obj['_data']),
dtype=np.dtype(obj['_dtype'])).reshape(
self.build(obj['_numpy'])
)
elif '_float' in obj:
return float(str(obj['_float']))
elif '_integer' in obj:
return float(str(obj['_integer']))
elif '_uuid' in obj:
return int(UUID(obj['_uuid']))
elif '_cls' in obj and '_dict' in obj:
if obj['_cls'] not in self.class_list:
self.update_class_list()
if obj['_cls'] not in self.class_list:
# updating did not help, so there is nothing we can do.
raise ValueError((
'Cannot create obj of class `%s`.\n' +
'Class is not registered as creatable! '
'You might have to define\n' +
'the class locally and call '
'`update_storable_classes()` on your storage.') %
obj['_cls'])
attributes = self.build(obj['_dict'])
ret = self.class_list[obj['_cls']].from_dict(attributes)
if '_obj_uuid' in obj:
# vals = {x: getattr(ret, x) for x in ret._find_by}
ret.__uuid__ = int(UUID(obj['_obj_uuid']))
# for k,v in vals.iteritems():
# setattr(ret, )
return ret
elif '_tuple' in obj:
return tuple([self.build(o) for o in obj['_tuple']])
elif '_type' in obj:
# return a type of a _built-in_ `storable` type
return self.type_names.get(obj['_type'])
elif '_dict' in obj:
return {
self._unicode2str(self.build(key)): self.build(o)
for key, o in self.build(obj['_dict'])
}
elif '_import' in obj:
module = obj['_import']
if module.split('.')[0] in self.safe_modules:
imp = importlib.import_module(module)
return imp
else:
return None
elif '_marshal' in obj or '_module' in obj:
return self.callable_from_dict(obj)
else:
return {
self._unicode2str(key): self.build(o)
for key, o in obj.iteritems()
}
elif type(obj) is list:
return [self.build(o) for o in obj]
elif type(obj) is unicode:
return self._unicode2str(obj)
else:
return obj
@staticmethod
[docs] def unit_to_symbol(unit):
return str(1.0 * unit).split()[1]
# @staticmethod
# def unit_to_dict(unit):
# unit_dict = {
# p.name: int(fac) for p, fac in unit.iter_base_or_scaled_units()}
# return unit_dict
#
# @staticmethod
# def unit_from_dict(unit_dict):
# unit = units.Unit({})
# for unit_name, unit_multiplication in unit_dict.iteritems():
# unit *= getattr(units, unit_name) ** unit_multiplication
#
# return unit
@staticmethod
[docs] def callable_to_dict(c):
"""
Turn a callable function of class into a dictionary
Used for conversion to JSON
Parameters
----------
c : callable (function or class with __call__)
the function to be turned into a dict representation
Returns
-------
dict
the dict representation of the callable
"""
f_module = c.__module__
root_module = f_module.split('.')[0]
# is_class = isinstance(c, (type, types.ClassType))
# try saving known external classes of functions, e.g. `msmbuilder`
if root_module in ObjectJSON.safe_modules:
# only store the function/class and the module
return {
'_module': c.__module__,
'_name': c.__name__
}
# if the easy way did not work, try saving it using bytecode
if ObjectJSON.allow_marshal and callable(c):
# use marshal
global_vars = ObjectJSON._find_var(c, opcode.opmap['LOAD_GLOBAL'])
import_vars = ObjectJSON._find_var(c, opcode.opmap['IMPORT_NAME'])
builtins = dir(__builtin__)
global_vars = list(set(
[var for var in global_vars if var not in builtins]))
import_vars = list(set(import_vars))
err = ''
if len(global_vars) > 0:
err += 'The function you try to save relies on globally set ' \
'variables and these cannot be saved since storage ' \
'has no access to the global scope which includes ' \
'imports! \n\n'
err += 'We require that the following globals: ' + \
str(global_vars) + ' either\n'
err += '\n1. be replaced by constants'
err += '\n2. be defined inside your function,' + \
'\n\n' + '\n'.join(
map(lambda x: ' ' * 8 + x + '= ...', global_vars)
) + '\n'
err += '\n3. imports need to be "re"-imported inside your ' \
'function' + \
'\n\n' + '\n'.join(
map(lambda x: ' ' * 8 + 'import ' + x, global_vars)
) + '\n'
err += '\n4. be passed as an external parameter ' \
'(not for imports!)'
err += '\n\n my_cv = FunctionCV("cv_name", ' + \
c.func_name + ', \n' + \
',\n'.join(
map(lambda x: ' ' * 20 + x + '=' + x, global_vars)
) + ')' + '\n'
err += '\n and change your function definition like this'
err += '\n\n def ' + \
c.func_name + '(snapshot, ..., ' + \
'\n' + ',\n'.join(
map(lambda x: ' ' * 16 + x, global_vars)
) + '):'
unsafe_modules = [
module for module in import_vars
if module not in ObjectJSON.safe_modules
]
if ObjectJSON.prevent_unsafe_modules and len(unsafe_modules) > 0:
if len(err) > 0:
err += '\n\n'
err += 'The function you try to save requires the following' \
' modules to be installed: ' + str(unsafe_modules) + \
' which are not marked as safe! '
err += 'You can change the list of safe modules using '
err += '\n\n ObjectJSON.safe_modules.extend(['
err += '\n' + ',\n'.join(
map(lambda x: ' ' * 12 + x, unsafe_modules)
)
err += '\n ])'
err += '\n\n'
err += 'include the import statement in your function like'
err += '\n\n' + '\n'.join(
[' ' * 8 + 'import ' + v for v in unsafe_modules])
if len(err) > 0:
raise RuntimeError('Cannot store function! \n\n' +
word_wrap(err, 60))
return {
'_marshal': base64.b64encode(
marshal.dumps(c.func_code)),
'_global_vars': global_vars,
'_module_vars': import_vars
}
raise RuntimeError('Locally defined classes are not storable yet')
@staticmethod
[docs] def callable_from_dict(c_dict):
"""
Turn a dictionary back in a callable function or class
Used for conversion from JSON
Parameters
----------
c_dict : dict
the dictionary that contains the information
Returns
-------
callable
the reconstructed callable function or class
"""
c = None
if c_dict is not None:
if '_marshal' in c_dict:
if ObjectJSON.allow_marshal:
code = marshal.loads(base64.b64decode(c_dict['_marshal']))
c = types.FunctionType(code, globals(), code.co_name)
elif '_module' in c_dict:
module = c_dict['_module']
packages = module.split('.')
if packages[0] in ObjectJSON.safe_modules:
imp = importlib.import_module(module)
c = getattr(imp, c_dict['_name'])
return c
@staticmethod
def _find_var(code, op):
"""
Helper function to search in python bytecode for specific function calls
Parameters
----------
code : function
the python bytecode to be searched
op : int
the int code of the code to be found
Returns
-------
list of func_code.co_names
a list of co_names used in this function when calling op
"""
# TODO: Clean this up. It now works only for codes that use co_names
opcodes = code.func_code.co_code
i = 0
ret = []
while i < len(opcodes):
int_code = ord(opcodes[i])
if int_code == op:
ret.append((i, ord(opcodes[i + 1]) + ord(opcodes[i + 2]) * 256))
if int_code < opcode.HAVE_ARGUMENT:
i += 1
else:
i += 3
return [code.func_code.co_names[i[1]] for i in ret]
[docs] def to_json(self, obj, base_type=''):
simplified = self.simplify(obj, base_type)
return ujson.dumps(simplified)
[docs] def to_json_object(self, obj):
if hasattr(obj, 'base_cls') \
and type(obj) is not type and type(obj) is not abc.ABCMeta:
simplified = self.simplify_object(obj)
else:
simplified = self.simplify(obj)
return ujson.dumps(simplified)
[docs] def from_json(self, json_string):
simplified = ujson.loads(json_string)
return self.build(simplified)
# def unit_to_json(self, unit):
# simple = self.unit_to_dict(unit)
# return self.to_json(simple)
#
# def unit_from_json(self, json_string):
# return self.unit_from_dict(self.from_json(json_string))
[docs] def from_simple_dict(self, simplified):
obj = self.build(simplified)
obj.__uuid__ = int(UUID(simplified.get('_id')))
obj.__time__ = simplified.get('_time', 0) # use time or 0 if unset
if 'name' in simplified:
obj.name = simplified['name']
for key in obj._find_by:
if key in simplified:
setattr(obj, key, self.build(simplified[key]))
return obj
[docs] def to_simple_dict(self, obj, base_type=''):
dct = {
'_cls': obj.__class__.__name__,
'_obj_uuid': str(UUID(int=obj.__uuid__)),
'_dict': self.simplify(obj.to_dict(), base_type),
'_id': str(UUID(int=obj.__uuid__)),
'_time': int(obj.__time__)}
if hasattr(obj, 'name'):
dct['name'] = obj.name
for key in obj._find_by:
if hasattr(obj, key):
dct[key] = self.simplify(getattr(obj, key))
return dct
[docs]class UUIDObjectJSON(ObjectJSON):
def __init__(self, storage, unit_system=None):
super(UUIDObjectJSON, self).__init__(unit_system)
self.excluded_keys = ['json']
self.storage = storage
[docs] def simplify(self, obj, base_type=''):
if obj is self.storage:
return {'_storage': 'self'}
if obj.__class__.__module__ != '__builtin__':
if obj.__class__ in self.storage._obj_store:
if not obj._ignore:
store = self.storage._obj_store[obj.__class__]
store.save(obj)
return {
'_hex_uuid': hex(obj.__uuid__),
'_store': store.name}
return super(UUIDObjectJSON, self).simplify(obj, base_type)
[docs] def build(self, obj):
if type(obj) is dict:
if '_storage' in obj:
if obj['_storage'] == 'self':
return self.storage
if '_obj_uuid' in obj and '_store' in obj:
store = self.storage._stores[obj['_store']]
result = store.load(int(UUID(obj['_obj_uuid'])))
return result
if '_hex_uuid' in obj and '_store' in obj:
store = self.storage._stores[obj['_store']]
result = store.load(long(obj['_hex_uuid'], 16))
return result
return super(UUIDObjectJSON, self).build(obj)
# a little code snippet to wrap strings around for nicer output
# idea found @ http://www.saltycrane.com/blog/2007/09/python-word-wrap-function/
[docs]def word_wrap(string, width=80):
lines = string.split('\n')
lines = [x.rstrip() for x in lines]
result = []
for line in lines:
while len(line) > width:
marker = width - 1
while not line[marker].isspace():
marker -= 1
result.append(line[0:marker])
line = line[marker + 1:]
result.append(line)
return '\n'.join(result)