from numpy import (empty, sqrt, mean, array, einsum,
exp, floor, log10, absolute, load)
from math import pi
from os import path
from contextlib import suppress
from contextlib import contextmanager
from io import StringIO
import sys
from mango.constants import c, _variables
from mango.io import _input
from mango.imports import inquirer
from mango.arguments import column_manip
from mango.pp.energy import energy_calc
[docs]@contextmanager
def captured_output():
nout, nerr = StringIO(), StringIO()
oout, oerr = sys.stdout, sys.stderr
try:
sys.stdout, sys.stderr = nout, nerr
yield sys.stdout, sys.stderr
finally:
sys.stdout, sys.stderr = oout, oerr
[docs]def getvars(varib, lst):
rlst = []
for x in lst:
try:
if x in ['nmax', 'written', 'skip_iters']:
rlst.append(int(varib[x]))
else:
rlst.append(varib[x])
except KeyError:
rlst.append(None)
with suppress(ValueError):
lst[lst.index('no_molecules')] = 'no_mol'
return _variables(**dict(zip(lst, rlst)))
[docs]def get_expon(number):
return (floor(log10(abs(number)))).astype(int).astype(float)
[docs]def at_least2d_end(*args):
lst = []
for i in args:
if len(i.shape) < 2:
lst.append(i[:, None])
else:
lst.append(i)
if len(lst) == 1:
lst = lst[0]
return lst
[docs]def interactive_getfiles(flg, check_name, files):
"""
Get postprocessing jobs if none selected, Check file exists.
Parameters
----------
Error: func
flg: instance
instance of required flags
check_name: str
name of file to read in
files: list
list of files in selected directory
"""
if (not (flg.column or flg.lengthcheck or
flg.files['xyz'] or flg.files['energy'] or flg.files['momenta'] or flg.kinetic) and
((flg.suscep and isinstance(flg.suscep, bool) or not flg.suscep))):
inq, (Checkbox, ConsoleRender) = inquirer()
if inq and c.tinfo['otty']:
c.Error(">G Nothing to do please select:\n ")
choices = {'XYZ File': "xyz", 'Momenta File': "momenta", 'Energy File': "energy",
'Raw Data plotting': "column", 'Susceptibility': "suscep"}
question = Checkbox('pp', message="{}{}".format("Which postprocessing tool do you want?",
" (right arrow to select)"),
choices=choices.keys())
selections = set(choices.keys()).intersection(ConsoleRender().render(question, {}))
if selections == set():
print('')
c.Error("F No postprocessing tool selected" + 30 * ' ')
for i in selections:
if i == "Raw Data plotting":
flg.column, flg.average, flg.files = column_manip([_input("Enter x string: "),
_input("Enter y string: ")],
flg.average, flg.files)
elif i == "Susceptibility":
flg.suscep = _input("Graphs [chi mag vel]?: ").split()
else:
# av = _input("Averaged {} [y/n]?: ".format(i))
# flg.average[choices[i][0]] = (av == "y")
flg.files[choices[i]] = True
else:
c.Error("F No postprocessing tool selected")
while not any(file.startswith(check_name) for file in files) and c.tinfo['otty']:
c.Error(">G Cannot find {}run".format("Susceptibility " if flg.suscep else ""))
run_number = _input("\nRun number or rerun [y/n]?: ")
if run_number == "y" or run_number == "yes":
flg.run = False
break
elif run_number == "n" or run_number == "no":
c.Error(">ME Exiting\n\n")
try:
flg.run = int(run_number)
check_name = '{}Run{:g}_mol-'.format("S_" if flg.suscep else "", flg.run)
except ValueError:
c.Error("F Not a number, Exiting")
if not c.tinfo['otty'] and not any(file.startswith(check_name) for file in files):
c.Error("F Run not found")
return flg, check_name
[docs]def getlocation(keys, dictionary, stats, kd={}):
"""
Apply any basic arithmetic to keys with newly found location.
Arrays can be split eg array[:,1] etc
"""
for no, xy in enumerate(keys):
ustr = string = ''
op = {}
units = {}
std_var = {}
sumkey = {}
ar = {}
for k, key in enumerate(xy):
if key[1] == 'False':
op[k] = key[0]
string += " {} ".format(op[k])
else:
sumkey[k], units[k] = dictfind(key[0], dictionary)
string += "sumkey[{}]".format(k)
if key[1] != 'True':
ar[k] = key[1]
string += ar[k]
add = {o: p for o, p in op.items() if p in ["*", "/", "-", "+"]}
if len(units.keys()) > 1:
for num in range(k + 1):
if num in units:
ustr += units[num]
if num in add:
ustr += add[num]
else:
ustr += units[0]
# If numexpr becomes a req this is safer and quicker
#
# string = string.split()
# new_str = ""
# for i, j in enumerate(string):
# if j in ["*", "/", "-", "+"]:
# new_str += j
# else:
# exec('var_' + str(i) + ' = ' + j)
# new_str += "var_{}".format(i)
# evaled = evaluate(new_str)
evaled = eval(string)
kd[no] = [evaled, ustr]
return kd[0], kd[1]
[docs]def dictfind(key, dictionary):
"""
Find key in nested dictionary, Returns as soon as found.
Parameters
----------
key: str
key to find
dictionary: dict
2 level dictionary of all data
Returns
-------
d: ndarray
data array
u: str
units
std_var: ndarray/bool
standard deviation array, False if data not averaged
"""
av = ["x", "e", "m"]
if key == "time":
return dictionary["time"], c.units["time"]
for no, i in enumerate(["xyz", "energy", "momenta"]):
with suppress(TypeError, KeyError):
d = dictionary[i][key]
u = c.units[i][key]
return d, u
[docs]class loader():
"""Load postprocessed data files."""
def __init__(self, flg, run, directory):
self.flg = flg
self.run = run
self.directory = directory
[docs] def lengthcheck(self, block):
"""Load numpy length check array."""
name = "{}{}_distance_arrays_{}{}.npz".format(self.directory, "{}Run{}".format(
'S_' if self.flg.suscep else '', self.run), block, "_ALIGNED" if self.flg.align else '')
return load(name)
[docs]class Dataarrays():
"""Data array collection."""
def __init__(self, flg, reader, var, name, eq=0.1):
"""
flg - files kinetic lengthcheck save_type ufile suscep
var - stats no_mol skip_iters epsilon sigma limit mass
"""
var.stats.sort()
# stats = [0, 1]
self.flg = flg
self.var = var
self.reader = reader
self.name = name
print("Checking Files...", end='', flush=True)
self.datalen, self.names = self._showmissing()
self.length()
print("Done")
self.xyz_data = self.momenta_data = self.energy_data = None
self.angular = self.pos_com = None
self.eq = int(self.var.skip_iters * eq)
skip_iters = int(self.var.skip_iters - self.eq)
if flg.files['xyz'] or flg.files['energy'] or flg.kinetic or flg.lengthcheck:
self.xyz_data = empty((self.datalen, skip_iters, self.var.no_mol, 12))
if flg.files['energy']:
self.energy_data = empty((self.datalen, self.var.skip_iters, 4))
if flg.files['momenta']:
self.momenta_data = empty((self.datalen, self.var.skip_iters, 15))
self.sus_data, self.sus_list, self.slocs = self._sus_em(skip_iters, self.var.no_mol)
def _showmissing(self, fmiss='', names={}):
"""Show missing statistics from list."""
stat_copy = []
for stat in self.var.stats:
self.name = "{}.{:g}.{}".format(self.name.rsplit(".", maxsplit=2)[0], stat + 1,
self.flg.save_type if not self.flg.ufile else self.flg.ufile[0].split('.')[-1])
if path.isfile(self.name):
names[stat] = self.name
stat_copy.append(stat)
self.var.stats = stat_copy
rsts = list(set(self.var.stats).difference(names.keys()))
if len(rsts) > 0:
splt = [0] + [i for i in range(1, len(rsts)) if rsts[i] - rsts[i - 1] > 1] + [None]
missing = [rsts[b:e] for (b, e) in [(splt[i - 1], splt[i]) for i in range(1, len(splt))]]
for m in missing:
if len(m) > 1:
fmiss += "{} to {}, ".format(m[0] + 1, m[-1] + 1)
else:
fmiss += "{}, ".format(m[0] + 1)
c.Error(">W Can't find the following repetitions: {}".format(fmiss[:-2]))
return len(list(names.keys())), names
[docs] def length(self):
lstore = []
self.pstore = {}
for n in self.names.values():
p, ls = self.reader.read(fname=n, lengthcheck=True)
self.pstore[n] = p
lstore.append(ls)
small = min(lstore) + 1
if small < self.var.skip_iters:
c.Error(f">W All runs not the same length shortest simulation is {small}")
self.var.skip_iters = small
def _sus_em(self, skip_iters, no_mol, sus_list=[], sus_data=None, slocs=None):
"""Create empty arrays as needed for susceptibility data."""
if self.flg.suscep and not isinstance(self.flg.suscep, bool):
sus_data = {}
slocs = {"pos": [(0, 3), 'position'],
"mom": [(3, 6), 'momentum'],
"mag": [(6, 9), 'magnetisation']}
sus_list = list(slocs.keys())
sus_list += ['angular', 'inertia']
for i in sus_list:
sus_data[i] = empty((self.datalen, skip_iters, no_mol, 3))
return sus_data, sus_list, slocs
def _em_vars(self, epsilon=None, sigma=None, limit=None, mass=None):
for i, j in zip(['epsilon', 'sigma', 'limit', 'mass'], [epsilon, sigma, limit, mass]):
if j is not None:
setattr(self.var, i, j)
[docs] def energy(self, stat, name, epsilon=None, sigma=None, limit=None, mass=None):
"""Get total energy data."""
# TODO expose more energy data to user
self._em_vars(epsilon, sigma, limit, mass)
if self.flg.files['energy']:
self.energy_sv = energy_calc(self.pstore[name][:self.var.skip_iters],
self.reader.read("momentum", name)[:self.var.skip_iters],
self.reader.read("magnetisation", name)[:self.var.skip_iters],
self.var.epsilon, self.var.sigma, self.var.limit, self.var.mass)
self.energy_data[stat, :, 0] = self.energy_sv['kinetic']
self.energy_data[stat, :, 1] = self.energy_sv['trans_pot']
self.energy_data[stat, :, 2] = self.energy_sv['mag_pot']
self.energy_data[stat, :, 3] = self.energy_sv['total_E']
[docs] def momenta(self, stat, name, epsilon=None, sigma=None, limit=None, mass=None):
"""Get momenta data."""
self._em_vars(epsilon, sigma, limit, mass)
if self.flg.files['momenta']:
if hasattr(self, 'energy_sv'):
momenta = self.energy_sv
else:
momenta = energy_calc(self.pstore[name][:self.var.skip_iters],
self.reader.read("momentum", name)[:self.var.skip_iters],
self.reader.read("magnetisation", name)[:self.var.skip_iters],
self.var.epsilon, self.var.sigma, self.var.limit, self.var.mass)
self.momenta_data[stat, :, 0:3] = momenta['total_M']
self.momenta_data[stat, :, 3:6] = momenta['total_mag']
self.momenta_data[stat, :, 6:9] = momenta['total_angular']
self.momenta_data[stat, :, 9:12] = momenta['CoM']
self.momenta_data[stat, :, 12:15] = momenta['CoM_vel']
[docs] def sus(self, stat, name):
"""Get susceptibility data."""
if self.flg.suscep and not isinstance(self.flg.suscep, bool):
for i in self.sus_list:
if i in ['angular', 'inertia']:
pass
elif not (self.flg.files['xyz'] or self.flg.files['energy'] or self.flg.kinetic or self.flg.lengthcheck):
if self.slocs[i][1] == "position":
self.sus_data[i][stat, ...] = self.pstore[name][self.eq:self.var.skip_iters]
else:
self.sus_data[i][stat, ...] = self.reader.read(self.slocs[i][1], name)[self.eq:self.var.skip_iters]
else:
self.sus_data[i][stat, ...] = self.xyz_data[stat, :, :, self.slocs[i][0][0]:self.slocs[i][0][1]]
[docs] def xyz(self, stat, name):
"""Get xyz data."""
if self.flg.files['xyz'] or self.flg.kinetic or self.flg.lengthcheck:
self.xyz_data[stat, :, :, 0:3] = self.pstore[name][self.eq:self.var.skip_iters]
self.xyz_data[stat, :, :, 3:6] = self.reader.read("momentum", name)[self.eq:self.var.skip_iters]
self.xyz_data[stat, :, :, 6:9] = self.reader.read("magnetisation", name)[self.eq:self.var.skip_iters]
self.xyz_data[stat, :, :, 9:12] = self.reader.read("forces", name)[self.eq:self.var.skip_iters]
[docs]def showgraph(mpl, showg=False):
"""Show graphs onscreen at end of run."""
if showg:
show, get_fignums = mpl.showg()
if get_fignums():
if not c.using_IP:
c.Error("{}{}{}".format("M Press Enter to close all graphs,\n",
"Space to close current graph\n",
"Click on legend to show/hide lines"))
with suppress(AttributeError):
show()
else:
c.Error("M >No Graph to Show")
[docs]class onevent():
def __init__(self, mpl):
self.close = mpl.close()
self.figcall = {}
from matplotlib.lines import Line2D
self.l2d = Line2D
[docs] def connect(self, fig, leg=None, line=None):
"""
Connect figures to events.
Parameters
----------
fig: figure
leg: legend
line: line
"""
if fig.number not in self.figcall:
self.figcall[fig.number] = {"key": fig.canvas.mpl_connect('key_press_event', self.onkey),
"pick": fig.canvas.mpl_connect('pick_event', self.onpick),
"fig": fig}
self.figcall[fig.number]['scales'] = self.fig_d(fig)
if leg is not None:
if "ld" in self.figcall[fig.number]:
self.figcall[fig.number]['ld'].update(self.line_d(leg.legendHandles, line))
else:
self.figcall[fig.number]['ld'] = self.line_d(leg.legendHandles, line)
[docs] @staticmethod
def fig_d(fig):
"""
Set picker for yaxis.
Parameters
----------
fig: fig instance
"""
scales = {}
for ax in fig.axes:
ax.get_yaxis().set_picker(100)
scales[ax] = {ax.get_yscale(): ax.get_ylim()}
return scales
[docs] @staticmethod
def line_d(legh, line):
"""
Set picker for legend.
Parameters
----------
legh: legendHandles
line: line object
Returns
-------
dict:
a dictionary linking legend entry and line
"""
lst = {}
for l, o in zip(legh, line):
l.set_picker(5)
lst[l] = o
return lst
# def setlim(self, xmin, xmax, ymin, ymax):
# self.figcall[self.cn]['xm'] = xmin
# self.figcall[self.cn]['ym'] = ymin
# self.figcall[self.cn]['xM'] = xmax
# self.figcall[self.cn]['yM'] = ymax
[docs] def onpick(self, event):
"""Switch lines visibility."""
fig_dict = self.figcall[event.canvas.figure.number]
lline = event.artist
if lline in fig_dict['ld']:
oline = fig_dict['ld'][lline]
if isinstance(oline, self.l2d):
vis = not oline.get_visible()
oline.set_visible(vis)
else:
for o in oline:
vis = not o.get_visible()
o.set_visible(vis)
lline.set_alpha(1.0) if vis else lline.set_alpha(0.2)
else:
ax = event.artist.axes
scale = ax.get_yscale()
if scale not in fig_dict['scales'][ax]:
fig_dict['scales'][ax][scale] = ax.get_ylim()
ax.set_yscale("linear" if scale == 'log' else 'log')
with suppress(KeyError):
ax.set_ylim(fig_dict['scales'][ax]["linear" if scale == 'log' else 'log'])
fig_dict['fig'].canvas.draw()
[docs] def onkey(self, event):
"""On key events."""
if event.key == "enter":
self.close('all')
elif event.key == " ":
self.close(event.canvas.figure.number)