Source code for mango.pp.util

from numpy import (empty, sqrt, mean, array, einsum,
                   exp, floor, log10, absolute, load)
from math import pi
from os import path
from contextlib import suppress
from contextlib import contextmanager
from io import StringIO
import sys

from mango.constants import c, _variables
from mango.io import _input
from mango.imports import inquirer
from mango.arguments import column_manip
from mango.pp.energy import energy_calc


[docs]@contextmanager def captured_output(): nout, nerr = StringIO(), StringIO() oout, oerr = sys.stdout, sys.stderr try: sys.stdout, sys.stderr = nout, nerr yield sys.stdout, sys.stderr finally: sys.stdout, sys.stderr = oout, oerr
[docs]def getvars(varib, lst): rlst = [] for x in lst: try: if x in ['nmax', 'written', 'skip_iters']: rlst.append(int(varib[x])) else: rlst.append(varib[x]) except KeyError: rlst.append(None) with suppress(ValueError): lst[lst.index('no_molecules')] = 'no_mol' return _variables(**dict(zip(lst, rlst)))
[docs]def get_expon(number): return (floor(log10(abs(number)))).astype(int).astype(float)
[docs]def at_least2d_end(*args): lst = [] for i in args: if len(i.shape) < 2: lst.append(i[:, None]) else: lst.append(i) if len(lst) == 1: lst = lst[0] return lst
[docs]def interactive_getfiles(flg, check_name, files): """ Get postprocessing jobs if none selected, Check file exists. Parameters ---------- Error: func flg: instance instance of required flags check_name: str name of file to read in files: list list of files in selected directory """ if (not (flg.column or flg.lengthcheck or flg.files['xyz'] or flg.files['energy'] or flg.files['momenta'] or flg.kinetic) and ((flg.suscep and isinstance(flg.suscep, bool) or not flg.suscep))): inq, (Checkbox, ConsoleRender) = inquirer() if inq and c.tinfo['otty']: c.Error(">G Nothing to do please select:\n ") choices = {'XYZ File': "xyz", 'Momenta File': "momenta", 'Energy File': "energy", 'Raw Data plotting': "column", 'Susceptibility': "suscep"} question = Checkbox('pp', message="{}{}".format("Which postprocessing tool do you want?", " (right arrow to select)"), choices=choices.keys()) selections = set(choices.keys()).intersection(ConsoleRender().render(question, {})) if selections == set(): print('') c.Error("F No postprocessing tool selected" + 30 * ' ') for i in selections: if i == "Raw Data plotting": flg.column, flg.average, flg.files = column_manip([_input("Enter x string: "), _input("Enter y string: ")], flg.average, flg.files) elif i == "Susceptibility": flg.suscep = _input("Graphs [chi mag vel]?: ").split() else: # av = _input("Averaged {} [y/n]?: ".format(i)) # flg.average[choices[i][0]] = (av == "y") flg.files[choices[i]] = True else: c.Error("F No postprocessing tool selected") while not any(file.startswith(check_name) for file in files) and c.tinfo['otty']: c.Error(">G Cannot find {}run".format("Susceptibility " if flg.suscep else "")) run_number = _input("\nRun number or rerun [y/n]?: ") if run_number == "y" or run_number == "yes": flg.run = False break elif run_number == "n" or run_number == "no": c.Error(">ME Exiting\n\n") try: flg.run = int(run_number) check_name = '{}Run{:g}_mol-'.format("S_" if flg.suscep else "", flg.run) except ValueError: c.Error("F Not a number, Exiting") if not c.tinfo['otty'] and not any(file.startswith(check_name) for file in files): c.Error("F Run not found") return flg, check_name
[docs]def getlocation(keys, dictionary, stats, kd={}): """ Apply any basic arithmetic to keys with newly found location. Arrays can be split eg array[:,1] etc """ for no, xy in enumerate(keys): ustr = string = '' op = {} units = {} std_var = {} sumkey = {} ar = {} for k, key in enumerate(xy): if key[1] == 'False': op[k] = key[0] string += " {} ".format(op[k]) else: sumkey[k], units[k] = dictfind(key[0], dictionary) string += "sumkey[{}]".format(k) if key[1] != 'True': ar[k] = key[1] string += ar[k] add = {o: p for o, p in op.items() if p in ["*", "/", "-", "+"]} if len(units.keys()) > 1: for num in range(k + 1): if num in units: ustr += units[num] if num in add: ustr += add[num] else: ustr += units[0] # If numexpr becomes a req this is safer and quicker # # string = string.split() # new_str = "" # for i, j in enumerate(string): # if j in ["*", "/", "-", "+"]: # new_str += j # else: # exec('var_' + str(i) + ' = ' + j) # new_str += "var_{}".format(i) # evaled = evaluate(new_str) evaled = eval(string) kd[no] = [evaled, ustr] return kd[0], kd[1]
[docs]def dictfind(key, dictionary): """ Find key in nested dictionary, Returns as soon as found. Parameters ---------- key: str key to find dictionary: dict 2 level dictionary of all data Returns ------- d: ndarray data array u: str units std_var: ndarray/bool standard deviation array, False if data not averaged """ av = ["x", "e", "m"] if key == "time": return dictionary["time"], c.units["time"] for no, i in enumerate(["xyz", "energy", "momenta"]): with suppress(TypeError, KeyError): d = dictionary[i][key] u = c.units[i][key] return d, u
[docs]class loader(): """Load postprocessed data files.""" def __init__(self, flg, run, directory): self.flg = flg self.run = run self.directory = directory
[docs] def lengthcheck(self, block): """Load numpy length check array.""" name = "{}{}_distance_arrays_{}{}.npz".format(self.directory, "{}Run{}".format( 'S_' if self.flg.suscep else '', self.run), block, "_ALIGNED" if self.flg.align else '') return load(name)
[docs]class Dataarrays(): """Data array collection.""" def __init__(self, flg, reader, var, name, eq=0.1): """ flg - files kinetic lengthcheck save_type ufile suscep var - stats no_mol skip_iters epsilon sigma limit mass """ var.stats.sort() # stats = [0, 1] self.flg = flg self.var = var self.reader = reader self.name = name print("Checking Files...", end='', flush=True) self.datalen, self.names = self._showmissing() self.length() print("Done") self.xyz_data = self.momenta_data = self.energy_data = None self.angular = self.pos_com = None self.eq = int(self.var.skip_iters * eq) skip_iters = int(self.var.skip_iters - self.eq) if flg.files['xyz'] or flg.files['energy'] or flg.kinetic or flg.lengthcheck: self.xyz_data = empty((self.datalen, skip_iters, self.var.no_mol, 12)) if flg.files['energy']: self.energy_data = empty((self.datalen, self.var.skip_iters, 4)) if flg.files['momenta']: self.momenta_data = empty((self.datalen, self.var.skip_iters, 15)) self.sus_data, self.sus_list, self.slocs = self._sus_em(skip_iters, self.var.no_mol) def _showmissing(self, fmiss='', names={}): """Show missing statistics from list.""" stat_copy = [] for stat in self.var.stats: self.name = "{}.{:g}.{}".format(self.name.rsplit(".", maxsplit=2)[0], stat + 1, self.flg.save_type if not self.flg.ufile else self.flg.ufile[0].split('.')[-1]) if path.isfile(self.name): names[stat] = self.name stat_copy.append(stat) self.var.stats = stat_copy rsts = list(set(self.var.stats).difference(names.keys())) if len(rsts) > 0: splt = [0] + [i for i in range(1, len(rsts)) if rsts[i] - rsts[i - 1] > 1] + [None] missing = [rsts[b:e] for (b, e) in [(splt[i - 1], splt[i]) for i in range(1, len(splt))]] for m in missing: if len(m) > 1: fmiss += "{} to {}, ".format(m[0] + 1, m[-1] + 1) else: fmiss += "{}, ".format(m[0] + 1) c.Error(">W Can't find the following repetitions: {}".format(fmiss[:-2])) return len(list(names.keys())), names
[docs] def length(self): lstore = [] self.pstore = {} for n in self.names.values(): p, ls = self.reader.read(fname=n, lengthcheck=True) self.pstore[n] = p lstore.append(ls) small = min(lstore) + 1 if small < self.var.skip_iters: c.Error(f">W All runs not the same length shortest simulation is {small}") self.var.skip_iters = small
def _sus_em(self, skip_iters, no_mol, sus_list=[], sus_data=None, slocs=None): """Create empty arrays as needed for susceptibility data.""" if self.flg.suscep and not isinstance(self.flg.suscep, bool): sus_data = {} slocs = {"pos": [(0, 3), 'position'], "mom": [(3, 6), 'momentum'], "mag": [(6, 9), 'magnetisation']} sus_list = list(slocs.keys()) sus_list += ['angular', 'inertia'] for i in sus_list: sus_data[i] = empty((self.datalen, skip_iters, no_mol, 3)) return sus_data, sus_list, slocs def _em_vars(self, epsilon=None, sigma=None, limit=None, mass=None): for i, j in zip(['epsilon', 'sigma', 'limit', 'mass'], [epsilon, sigma, limit, mass]): if j is not None: setattr(self.var, i, j)
[docs] def energy(self, stat, name, epsilon=None, sigma=None, limit=None, mass=None): """Get total energy data.""" # TODO expose more energy data to user self._em_vars(epsilon, sigma, limit, mass) if self.flg.files['energy']: self.energy_sv = energy_calc(self.pstore[name][:self.var.skip_iters], self.reader.read("momentum", name)[:self.var.skip_iters], self.reader.read("magnetisation", name)[:self.var.skip_iters], self.var.epsilon, self.var.sigma, self.var.limit, self.var.mass) self.energy_data[stat, :, 0] = self.energy_sv['kinetic'] self.energy_data[stat, :, 1] = self.energy_sv['trans_pot'] self.energy_data[stat, :, 2] = self.energy_sv['mag_pot'] self.energy_data[stat, :, 3] = self.energy_sv['total_E']
[docs] def momenta(self, stat, name, epsilon=None, sigma=None, limit=None, mass=None): """Get momenta data.""" self._em_vars(epsilon, sigma, limit, mass) if self.flg.files['momenta']: if hasattr(self, 'energy_sv'): momenta = self.energy_sv else: momenta = energy_calc(self.pstore[name][:self.var.skip_iters], self.reader.read("momentum", name)[:self.var.skip_iters], self.reader.read("magnetisation", name)[:self.var.skip_iters], self.var.epsilon, self.var.sigma, self.var.limit, self.var.mass) self.momenta_data[stat, :, 0:3] = momenta['total_M'] self.momenta_data[stat, :, 3:6] = momenta['total_mag'] self.momenta_data[stat, :, 6:9] = momenta['total_angular'] self.momenta_data[stat, :, 9:12] = momenta['CoM'] self.momenta_data[stat, :, 12:15] = momenta['CoM_vel']
[docs] def sus(self, stat, name): """Get susceptibility data.""" if self.flg.suscep and not isinstance(self.flg.suscep, bool): for i in self.sus_list: if i in ['angular', 'inertia']: pass elif not (self.flg.files['xyz'] or self.flg.files['energy'] or self.flg.kinetic or self.flg.lengthcheck): if self.slocs[i][1] == "position": self.sus_data[i][stat, ...] = self.pstore[name][self.eq:self.var.skip_iters] else: self.sus_data[i][stat, ...] = self.reader.read(self.slocs[i][1], name)[self.eq:self.var.skip_iters] else: self.sus_data[i][stat, ...] = self.xyz_data[stat, :, :, self.slocs[i][0][0]:self.slocs[i][0][1]]
[docs] def xyz(self, stat, name): """Get xyz data.""" if self.flg.files['xyz'] or self.flg.kinetic or self.flg.lengthcheck: self.xyz_data[stat, :, :, 0:3] = self.pstore[name][self.eq:self.var.skip_iters] self.xyz_data[stat, :, :, 3:6] = self.reader.read("momentum", name)[self.eq:self.var.skip_iters] self.xyz_data[stat, :, :, 6:9] = self.reader.read("magnetisation", name)[self.eq:self.var.skip_iters] self.xyz_data[stat, :, :, 9:12] = self.reader.read("forces", name)[self.eq:self.var.skip_iters]
[docs]def showgraph(mpl, showg=False): """Show graphs onscreen at end of run.""" if showg: show, get_fignums = mpl.showg() if get_fignums(): if not c.using_IP: c.Error("{}{}{}".format("M Press Enter to close all graphs,\n", "Space to close current graph\n", "Click on legend to show/hide lines")) with suppress(AttributeError): show() else: c.Error("M >No Graph to Show")
[docs]class onevent(): def __init__(self, mpl): self.close = mpl.close() self.figcall = {} from matplotlib.lines import Line2D self.l2d = Line2D
[docs] def connect(self, fig, leg=None, line=None): """ Connect figures to events. Parameters ---------- fig: figure leg: legend line: line """ if fig.number not in self.figcall: self.figcall[fig.number] = {"key": fig.canvas.mpl_connect('key_press_event', self.onkey), "pick": fig.canvas.mpl_connect('pick_event', self.onpick), "fig": fig} self.figcall[fig.number]['scales'] = self.fig_d(fig) if leg is not None: if "ld" in self.figcall[fig.number]: self.figcall[fig.number]['ld'].update(self.line_d(leg.legendHandles, line)) else: self.figcall[fig.number]['ld'] = self.line_d(leg.legendHandles, line)
[docs] @staticmethod def fig_d(fig): """ Set picker for yaxis. Parameters ---------- fig: fig instance """ scales = {} for ax in fig.axes: ax.get_yaxis().set_picker(100) scales[ax] = {ax.get_yscale(): ax.get_ylim()} return scales
[docs] @staticmethod def line_d(legh, line): """ Set picker for legend. Parameters ---------- legh: legendHandles line: line object Returns ------- dict: a dictionary linking legend entry and line """ lst = {} for l, o in zip(legh, line): l.set_picker(5) lst[l] = o return lst
# def setlim(self, xmin, xmax, ymin, ymax): # self.figcall[self.cn]['xm'] = xmin # self.figcall[self.cn]['ym'] = ymin # self.figcall[self.cn]['xM'] = xmax # self.figcall[self.cn]['yM'] = ymax
[docs] def onpick(self, event): """Switch lines visibility.""" fig_dict = self.figcall[event.canvas.figure.number] lline = event.artist if lline in fig_dict['ld']: oline = fig_dict['ld'][lline] if isinstance(oline, self.l2d): vis = not oline.get_visible() oline.set_visible(vis) else: for o in oline: vis = not o.get_visible() o.set_visible(vis) lline.set_alpha(1.0) if vis else lline.set_alpha(0.2) else: ax = event.artist.axes scale = ax.get_yscale() if scale not in fig_dict['scales'][ax]: fig_dict['scales'][ax][scale] = ax.get_ylim() ax.set_yscale("linear" if scale == 'log' else 'log') with suppress(KeyError): ax.set_ylim(fig_dict['scales'][ax]["linear" if scale == 'log' else 'log']) fig_dict['fig'].canvas.draw()
[docs] def onkey(self, event): """On key events.""" if event.key == "enter": self.close('all') elif event.key == " ": self.close(event.canvas.figure.number)