Source code for mango.arguments

import re
from numpy import zeros, genfromtxt, array, triu_indices, ndarray
from argparse import HelpFormatter, ArgumentParser
from sys import exit, argv
from os import makedirs, listdir

from mango.debug import debug
from mango.io import read_inputfile, read_data, restartfile_read, get_restartdata
from mango.constants import _variables, c, keys
from mango.managers import start_server


# def _parsingav(value):
#     """Set the averaging of a data set to true or false."""
#     return (value == "av")


[docs]def col_check(check): """Check columns are available.""" diffx = set(check["x"][:, 0]).difference(c.columns_flat) diffy = set(check["y"][:, 0]).difference(c.columns_flat) diff = set(list(diffx) + list(diffy)).difference(set(['+', '-', '*', '/'])) if diff != set([]): c.Error("F Invalid column{} {}, unable to plot graph".format( "s" if len(list(diff)) > 1 else "", diff))
[docs]class SmartFormatter(HelpFormatter): """ Smart help formatter. Allows for splitting the help over multiple lines by starting help with 'R|' """ def _split_lines(self, text, width): if text.startswith('R|'): return text[2:].splitlines() return HelpFormatter._split_lines(self, text, width) def _format_action_invocation(self, action): if not action.option_strings: metavar, = self._metavar_formatter(action, action.dest)(1) return metavar else: parts = [] # if the Optional doesn't take a value, format is: # -s, --long if action.nargs == 0: parts.extend(action.option_strings) # if the Optional takes a value, format is: # -s ARGS, --long ARGS else: default = action.dest.upper() args_string = self._format_args(action, default) # parts.append('%s %s' % (option_string, args_string)) ### this is change ls = args_string.split(" ", 1) if len(ls) == 2: args_string = ls[1][:-1] if ls[0].startswith('[') else ls[1] parts.extend(action.option_strings) parts[-1] += ' %s' % args_string return ', '.join(parts)
[docs]def list_add_arguments(self, k): """ Parse arguments from dictionary. Parameters ---------- k: OrderedDict Dictionary of keywords and defaults """ for i, j in k.items(): args = j[1][0] kw = j[1][1] args = tuple(args) if isinstance(args, list) else [args] try: if 'type' not in kw and 'action' not in kw: kw['type'] = j[0][1] if 'help' in kw: kw['help'] = kw['help'].format(j[0][2].rsplit("/", 1)[-1] if j[0][0] == 'logs' else j[0][2]) kw['default'] = None self.add_argument(*args, **kw) except TypeError: self.add_argument(*args)
[docs]def notebook_help(k=keys.words): """Print Help in Jupyter Notebook.""" from IPython.display import HTML, display st = '<table><tr><th>VariableName</th><th>Help</th></tr>' it = '<tr><td>{}</td><td>{}</td></tr>' end = '</table>' s = _help(k, st, it, _mlines_nb, end) display(HTML(s)) exit(0)
def _mlines_nb(s, it, i, j, fmt): s += it.format(i, j[1][1]['help'].format(fmt)[2:]) return s
[docs]def interactive_help(k=keys.words): """Print commandline help.""" st = 'VariableName' + 10 * ' ' + 'Help\n' + '-' * 30 + '\n' it = '{:<22s}{}\n\n' end = '' s = _help(k, st, it, _mlines_ih, end) print(s) exit(0)
def _mlines_ih(s, it, i, j, fmt): if len(j[1][1]['help'].split('\n')) > 1: string = j[1][1]['help'].split('\n') string[0] = string[0][2:] string[-1] = string[-1].format(fmt) newl = it.format(i, string[0])[:-1] for line in string[1:]: newl += it.format('', line)[:-1] s += newl + '\n' else: s += it.format(i, j[1][1]['help'].format(fmt)[2:]) return s def _help(k, st, it, func, end): """ Format help. Parameters ---------- k: dict dictionary of keywords st: str Header of help it: str table cells func: function entry formatter end: str final line of table Returns ------- s: str formatted help string """ s = st for i, j in k.items(): if j[0][0] == 'logs': fmt = j[0][2].rsplit("/", 1)[-1] else: fmt = j[0][2] s = func(s, it, i, j, fmt) # if len(j[1][1]['help'].split('\n')) > 1: # string = j[1][1]['help'].split('\n') # string[0] = string[0][2:] # string[-1] = string[-1].format(fmt) # newl = it.format(i, string[0])[:-1] # for line in string[1:]: # newl += it.format('', line)[:-1] # s += newl + '\n' # else: # s += it.format(i, j[1][1]['help'].format(fmt)[2:]) s += end return s
[docs]def gethelp(opts, parser): """Help output selector.""" if opts == "-h" or '-h' in argv: c._banner() if c.using_IP: notebook_help(keys.words) elif c.interactive: interactive_help(keys.words) else: parser.parse_args(['-h']) elif opts == '-ifh' or '-ifh' in argv: c._banner() interactive_help(keys.words)
[docs]def parse_in(k=keys.words): """ Argparse function. Parameters ---------- k: Ordereddict dictionary of available commands Returns ------- parser: instance argparse parser instance """ ArgumentParser.list_add_arguments = list_add_arguments parser = ArgumentParser(prog='mango', description="Mango: magnetic nanoparticle simulator CLI Help", formatter_class=SmartFormatter) parser.list_add_arguments(k) return parser
[docs]def array_vars(parsing, variables): """Set up arrays for variables.""" for key in variables: z_arr = zeros(parsing.no_molecules) if isinstance(parsing.__dict__[key], str): parsing.__dict__[key] = genfromtxt(parsing.__dict__[key], dtype=float) elif isinstance(parsing.__dict__[key], float): parsing.__dict__[key] += z_arr parsing.defaults += ["{}_1".format(key)] elif len(parsing.__dict__[key]) == parsing.no_molecules: parsing.__dict__[key] = array(parsing.__dict__[key], dtype=float) elif len(parsing.__dict__[key]) == 1: parsing.__dict__[key] = float(parsing.__dict__[key][0]) + z_arr parsing.defaults += ["{}_1".format(key)] else: c.Error("F {} array must be the same length as No. MNPs".format(key)) return parsing
[docs]def setdefaults(parsing, default_val): """ Set default values for variables that weren't in input. Parameters ---------- parsing: instance ArgumentParser instance default_val: dict Default Values Returns ------- parsing: instance ArgumentParser instance """ parsing.defaults = [] for key, value in default_val.items(): if key not in parsing.__dict__ or parsing.__dict__[key] is None: parsing.__dict__[key] = default_val[key] parsing.defaults += [key] return parsing
[docs]@debug(['args']) def parse(argparse, opts={}): """ Parse user input. Parameters ---------- argparse: bool Is argparse used? opts: dict dict of arguments if argparse isn't used Returns ------- parsing: instance instance of variables """ parser = parse_in(keys.words) gethelp(opts, parser) if argparse: parsing = parser.parse_args() parsing = read_inputfile(parsing.inputfile, parsing.restart) if parsing.inputfile else parsing else: parsing = read_inputfile(opts) if parsing.restart: return restart_in(parsing) if ('no_molecules' in parsing.__dict__ and 'boxsize' not in parsing.__dict__): rad = 1.1 * parsing.radius if 'radius' in parsing.__dict__ else keys.defaults["radius"] parsing.boxsize = parsing.no_molecules * rad parsing = setdefaults(parsing, keys.defaults) # parsing.average = {av: False for av in c.averages} parsing.files = {file: False for file in c.files} if parsing.cfile: if ['lf'] == parsing.cfile: c.Error("F No file type specified") parsing.lastframe = ('lf' in parsing.cfile) getfiles = set(parsing.cfile) & set(c.files) for i in getfiles: parsing.files[i] = True # parsing.average[c.averages[c.files.index(i)]] = _parsingav(parsing.cfile[parsing.cfile.index(i) - 1]) else: parsing.lastframe = None if parsing.column: parsing.column, parsing.files = column_manip(parsing.column, parsing.files) return splitvars(sanity(fixparam(array_vars(parsing, ['radius', 'dens']))))
[docs]def fixparam(p): """Modify parameters for units and types.""" set_const(p.no_molecules) p.Mdens *= 1e-6 # [1e6 emu/g] p.sigma, p.limit = calcsigmalim(p.radius, p.no_molecules) p.epsilon = c.Ar_KB * (p.sigma / c.Ar_sigma)**3 if p.epsilon is None else p.epsilon p.ms = p.Mdens * p.dens # [1e6 emu/g] * [g/mol] p.nu *= 1e-12 # time [1e-12 s] -> [1e12 Hz] p.extra_iter = None p.RandNoState = {} p.nmax = int(p.nmax) p.savechunk = int(p.savechunk) p.run = int(p.run) p.time = 0.0 p.field = (not p.suscep and p.H_0 != 0.0) p.stats = list(range(p.stats)) return p
[docs]def calcsigmalim(radius, np): """Calculate sigma and cutoff limit.""" dists = radius[c.tri[0]] + radius[c.tri[1]] return dists / (2**(1 / 6)), 1 / dists
[docs]def splitvars(p): """ Split parameters into variables and flags. Parameters ---------- p: instance argparse instance Returns ------- var, flg: instance variable and flag instances """ # Store variables and flags var_list, flags_list = keys.flgorvar() def store(lis, dic): nd = {} for l in lis: nd[l] = dic[l] return nd # Class of variables and flags var = _variables(**store(var_list, p.__dict__)) flg = _variables(**store(flags_list, p.__dict__)) # Some Variables need changing based on input flg.pp = not flg.run == 0 if flg.suscep == []: flg.suscep = True if var.temp == 0: flg.neel = flg.suscep = False return var, flg
[docs]def sanity(p): """Sanity checks.""" if p.dens.any() <= 0.0: c.Error("F The density of all particles must be greater than zero") if p.temp < 0: c.Error("F Temperature must be positive") if p.eta < 0: c.Error("F Viscosity must be positive") if isinstance(p.boxsize, ndarray) and (p.boxsize.size != 1 or p.boxsize.size != 3): c.Error("F Boxsize must be 1 dimensional or 3 dimensional") return p
[docs]def filenaming(var, flg, run=1): """ File naming. Avoid overwriting files etc. Parameters ---------- var, flg: instance class variable and flag instances """ if flg.pp or flg.restart: var.directory = var.logs if var.logs.endswith("/") else var.logs + "/" else: var.directory = "{}{}n{:g}_{:g}K_{}{:g}nm/".format( var.logs, "" if var.logs.endswith("/") else "/", var.nmax, var.temp, '' if flg.suscep else '{:g}gauss_{:g}kHz_'.format(var.H_0 * 1e-6, var.nu * 1e-15), var.radius[0] * 10) # Make Log directory as needed makedirs(var.directory, exist_ok=True) # avoid overwriting other log files try: files = listdir(var.directory) except FileNotFoundError: c.Error("F Log directory does not exist") # Get run number for filename if not flg.pp and not flg.restart: while any(file.startswith('{}Run{:g}'.format("S_" if flg.suscep else "", run)) for file in files): run += 1 flg.run = flg.run if flg.pp or flg.restart else run check_name = '{}Run{:g}_mol-'.format("S_" if flg.suscep else "", flg.run) if flg.pp and not flg.ufile: from mango.pp.util import interactive_getfiles flg, check_name = interactive_getfiles(flg, check_name, files) # Add final name and save location to variables var.name = "{}{}{:g}.".format(var.directory, check_name, var.no_molecules)
[docs]def verbosity(var, flg): """ Verbosity Changing. Parameters ---------- var, flg: instance class variable and flag instances """ c.header(flg.nout >= 2, flg.nout >= 4) verb = (flg.nout >= 2) flg.prog = (flg.nout >= 3 and c.tinfo['otty']) c.Error("EV {} {} {} {}".format(verb, var.directory, flg.run, 1 if flg.pp else 0)) # progress bar switches if flg.prog: c.progress = start_server("Prog") if not flg.pp: # (flg.column is not False or flg.suscep or flg.files['xyz']): c.progress(f"setup {var.stats}")
[docs]def set_const(no_molecules): c.tri = triu_indices(n=no_molecules, k=1) c.set_accuracy(c.EPS2)
[docs]def restart_in(parsing): """ Get data from run to be restarted. Parameters ---------- parsing: instance argparse parsing instance Returns ------- variables: instance class instance of all variables to be used """ filenames, save_type, directory, total = restartfile_read(parsing.restart) restart, restartflags, written = get_restartdata(read_data(filenames[0], save_type), filenames) restart.extra_iter = {f'stat{k}': total - writ - 1 for k, writ in written.items()} restart.time = 0.0 restart.stats = list(written.keys()) if parsing.walltime is not None: restart.walltime = parsing.walltime for i in ['logs', 'directory']: restart.__dict__[i] = directory restartflags.opt = False restartflags.restart = parsing.restart set_const(restart.no_molecules) return restart, restartflags
[docs]def column_manip(column_var, files): """ Create a list of plots to be made. TODO cleanup Parameters ---------- column_var: list list parsed from user input eg. ["time kinetic[5,:,3] + avtrans_pot / - avmag_pot[:100]", "time kinetic"] files: dict files to create Returns ------- plot_store: dict plots to create files: dict files to create """ def search(key): """ Search for averaging of data. Parameters ---------- key: str key to find Returns ------- key: str found key inlist: bool is key data to be averaged? """ inlist = False if key == "time": return [key, True] for enum, i in enumerate(c.columns.keys()): if key in c.columns[i] or key[2:] in c.columns[i]: files[i] = True inlist = True # if not average_var[c.averages[enum]] or key[:2] == 'av': # average_var[c.averages[enum]] = _parsingav(key[:2]) # key = key[2:] if average_var[c.averages[enum]] else key return [key, inlist] def rearray(j): """ Rearrayify array splitting. Parameters ---------- j: str array indexing like string eg 5,: Returns ------- j: str proper array index eg [5,:] """ for ind in range(len(j) // 2): j[(ind * 2) + 1] = "[" + j[(ind * 2) + 1] + "]" return j def shapedlist(j, cols_list_arr): """Shape array so its square also marker for 'missing' array.""" arr_j = array(j + [''] if len(j) % 2 == 1 else j).reshape(-1, 2) # split elements at operations for no, k in enumerate(arr_j[:, 0]): cols_list_arr += [search(cl) for cl in filter(None, re.split('([-+/*])', k))] if arr_j[no, 1] != '': cols_list_arr[-1][1] = arr_j[no, 1] cols_list_arr = [] cols_list = [] plot_no = 1 plot_store = {} for i, j in enumerate(column_var): # Plotting multiple graphs with multiple lines, split to new graph # if j == ",": # plot_store["plot_{}".format(plot_no)] = {"x": array(cols_list), "y": array(cols_list_arr)} # col_check(plot_store["plot_{}".format(plot_no)]) # cols_list_arr = [] # cols_list = [] # plot_no += 1 # continue j = re.split(r'[\[\]]', j.replace(" ", "")) if len(j) > 1: shapedlist(rearray(j), cols_list) if i == 0 else shapedlist(rearray(j), cols_list_arr) else: for k in j: colum = re.split('([-+/*])', k) for cl in colum: if i == 0: cols_list += [search(cl)] else: cols_list_arr += [search(cl)] # if j != ",": plot_store["plot_{}".format(plot_no)] = {"x": array(cols_list), "y": array(cols_list_arr)} col_check(plot_store["plot_{}".format(plot_no)]) return plot_store, files
if __name__ == "__main__": pass