import re
from numpy import zeros, genfromtxt, array, triu_indices, ndarray
from argparse import HelpFormatter, ArgumentParser
from sys import exit, argv
from os import makedirs, listdir
from mango.debug import debug
from mango.io import read_inputfile, read_data, restartfile_read, get_restartdata
from mango.constants import _variables, c, keys
from mango.managers import start_server
# def _parsingav(value):
# """Set the averaging of a data set to true or false."""
# return (value == "av")
[docs]def col_check(check):
"""Check columns are available."""
diffx = set(check["x"][:, 0]).difference(c.columns_flat)
diffy = set(check["y"][:, 0]).difference(c.columns_flat)
diff = set(list(diffx) + list(diffy)).difference(set(['+', '-', '*', '/']))
if diff != set([]):
c.Error("F Invalid column{} {}, unable to plot graph".format(
"s" if len(list(diff)) > 1 else "", diff))
[docs]def list_add_arguments(self, k):
"""
Parse arguments from dictionary.
Parameters
----------
k: OrderedDict
Dictionary of keywords and defaults
"""
for i, j in k.items():
args = j[1][0]
kw = j[1][1]
args = tuple(args) if isinstance(args, list) else [args]
try:
if 'type' not in kw and 'action' not in kw:
kw['type'] = j[0][1]
if 'help' in kw:
kw['help'] = kw['help'].format(j[0][2].rsplit("/", 1)[-1] if j[0][0] == 'logs' else j[0][2])
kw['default'] = None
self.add_argument(*args, **kw)
except TypeError:
self.add_argument(*args)
[docs]def notebook_help(k=keys.words):
"""Print Help in Jupyter Notebook."""
from IPython.display import HTML, display
st = '<table><tr><th>VariableName</th><th>Help</th></tr>'
it = '<tr><td>{}</td><td>{}</td></tr>'
end = '</table>'
s = _help(k, st, it, _mlines_nb, end)
display(HTML(s))
exit(0)
def _mlines_nb(s, it, i, j, fmt):
s += it.format(i, j[1][1]['help'].format(fmt)[2:])
return s
[docs]def interactive_help(k=keys.words):
"""Print commandline help."""
st = 'VariableName' + 10 * ' ' + 'Help\n' + '-' * 30 + '\n'
it = '{:<22s}{}\n\n'
end = ''
s = _help(k, st, it, _mlines_ih, end)
print(s)
exit(0)
def _mlines_ih(s, it, i, j, fmt):
if len(j[1][1]['help'].split('\n')) > 1:
string = j[1][1]['help'].split('\n')
string[0] = string[0][2:]
string[-1] = string[-1].format(fmt)
newl = it.format(i, string[0])[:-1]
for line in string[1:]:
newl += it.format('', line)[:-1]
s += newl + '\n'
else:
s += it.format(i, j[1][1]['help'].format(fmt)[2:])
return s
def _help(k, st, it, func, end):
"""
Format help.
Parameters
----------
k: dict
dictionary of keywords
st: str
Header of help
it: str
table cells
func: function
entry formatter
end: str
final line of table
Returns
-------
s: str
formatted help string
"""
s = st
for i, j in k.items():
if j[0][0] == 'logs':
fmt = j[0][2].rsplit("/", 1)[-1]
else:
fmt = j[0][2]
s = func(s, it, i, j, fmt)
# if len(j[1][1]['help'].split('\n')) > 1:
# string = j[1][1]['help'].split('\n')
# string[0] = string[0][2:]
# string[-1] = string[-1].format(fmt)
# newl = it.format(i, string[0])[:-1]
# for line in string[1:]:
# newl += it.format('', line)[:-1]
# s += newl + '\n'
# else:
# s += it.format(i, j[1][1]['help'].format(fmt)[2:])
s += end
return s
[docs]def gethelp(opts, parser):
"""Help output selector."""
if opts == "-h" or '-h' in argv:
c._banner()
if c.using_IP:
notebook_help(keys.words)
elif c.interactive:
interactive_help(keys.words)
else:
parser.parse_args(['-h'])
elif opts == '-ifh' or '-ifh' in argv:
c._banner()
interactive_help(keys.words)
[docs]def parse_in(k=keys.words):
"""
Argparse function.
Parameters
----------
k: Ordereddict
dictionary of available commands
Returns
-------
parser: instance
argparse parser instance
"""
ArgumentParser.list_add_arguments = list_add_arguments
parser = ArgumentParser(prog='mango',
description="Mango: magnetic nanoparticle simulator CLI Help",
formatter_class=SmartFormatter)
parser.list_add_arguments(k)
return parser
[docs]def array_vars(parsing, variables):
"""Set up arrays for variables."""
for key in variables:
z_arr = zeros(parsing.no_molecules)
if isinstance(parsing.__dict__[key], str):
parsing.__dict__[key] = genfromtxt(parsing.__dict__[key], dtype=float)
elif isinstance(parsing.__dict__[key], float):
parsing.__dict__[key] += z_arr
parsing.defaults += ["{}_1".format(key)]
elif len(parsing.__dict__[key]) == parsing.no_molecules:
parsing.__dict__[key] = array(parsing.__dict__[key], dtype=float)
elif len(parsing.__dict__[key]) == 1:
parsing.__dict__[key] = float(parsing.__dict__[key][0]) + z_arr
parsing.defaults += ["{}_1".format(key)]
else:
c.Error("F {} array must be the same length as No. MNPs".format(key))
return parsing
[docs]def setdefaults(parsing, default_val):
"""
Set default values for variables that weren't in input.
Parameters
----------
parsing: instance
ArgumentParser instance
default_val: dict
Default Values
Returns
-------
parsing: instance
ArgumentParser instance
"""
parsing.defaults = []
for key, value in default_val.items():
if key not in parsing.__dict__ or parsing.__dict__[key] is None:
parsing.__dict__[key] = default_val[key]
parsing.defaults += [key]
return parsing
[docs]@debug(['args'])
def parse(argparse, opts={}):
"""
Parse user input.
Parameters
----------
argparse: bool
Is argparse used?
opts: dict
dict of arguments if argparse isn't used
Returns
-------
parsing: instance
instance of variables
"""
parser = parse_in(keys.words)
gethelp(opts, parser)
if argparse:
parsing = parser.parse_args()
parsing = read_inputfile(parsing.inputfile, parsing.restart) if parsing.inputfile else parsing
else:
parsing = read_inputfile(opts)
if parsing.restart:
return restart_in(parsing)
if ('no_molecules' in parsing.__dict__ and 'boxsize' not in parsing.__dict__):
rad = 1.1 * parsing.radius if 'radius' in parsing.__dict__ else keys.defaults["radius"]
parsing.boxsize = parsing.no_molecules * rad
parsing = setdefaults(parsing, keys.defaults)
# parsing.average = {av: False for av in c.averages}
parsing.files = {file: False for file in c.files}
if parsing.cfile:
if ['lf'] == parsing.cfile:
c.Error("F No file type specified")
parsing.lastframe = ('lf' in parsing.cfile)
getfiles = set(parsing.cfile) & set(c.files)
for i in getfiles:
parsing.files[i] = True
# parsing.average[c.averages[c.files.index(i)]] = _parsingav(parsing.cfile[parsing.cfile.index(i) - 1])
else:
parsing.lastframe = None
if parsing.column:
parsing.column, parsing.files = column_manip(parsing.column, parsing.files)
return splitvars(sanity(fixparam(array_vars(parsing, ['radius', 'dens']))))
[docs]def fixparam(p):
"""Modify parameters for units and types."""
set_const(p.no_molecules)
p.Mdens *= 1e-6 # [1e6 emu/g]
p.sigma, p.limit = calcsigmalim(p.radius, p.no_molecules)
p.epsilon = c.Ar_KB * (p.sigma / c.Ar_sigma)**3 if p.epsilon is None else p.epsilon
p.ms = p.Mdens * p.dens # [1e6 emu/g] * [g/mol]
p.nu *= 1e-12 # time [1e-12 s] -> [1e12 Hz]
p.extra_iter = None
p.RandNoState = {}
p.nmax = int(p.nmax)
p.savechunk = int(p.savechunk)
p.run = int(p.run)
p.time = 0.0
p.field = (not p.suscep and p.H_0 != 0.0)
p.stats = list(range(p.stats))
return p
[docs]def calcsigmalim(radius, np):
"""Calculate sigma and cutoff limit."""
dists = radius[c.tri[0]] + radius[c.tri[1]]
return dists / (2**(1 / 6)), 1 / dists
[docs]def splitvars(p):
"""
Split parameters into variables and flags.
Parameters
----------
p: instance
argparse instance
Returns
-------
var, flg: instance
variable and flag instances
"""
# Store variables and flags
var_list, flags_list = keys.flgorvar()
def store(lis, dic):
nd = {}
for l in lis:
nd[l] = dic[l]
return nd
# Class of variables and flags
var = _variables(**store(var_list, p.__dict__))
flg = _variables(**store(flags_list, p.__dict__))
# Some Variables need changing based on input
flg.pp = not flg.run == 0
if flg.suscep == []:
flg.suscep = True
if var.temp == 0:
flg.neel = flg.suscep = False
return var, flg
[docs]def sanity(p):
"""Sanity checks."""
if p.dens.any() <= 0.0:
c.Error("F The density of all particles must be greater than zero")
if p.temp < 0:
c.Error("F Temperature must be positive")
if p.eta < 0:
c.Error("F Viscosity must be positive")
if isinstance(p.boxsize, ndarray) and (p.boxsize.size != 1 or p.boxsize.size != 3):
c.Error("F Boxsize must be 1 dimensional or 3 dimensional")
return p
[docs]def filenaming(var, flg, run=1):
"""
File naming.
Avoid overwriting files etc.
Parameters
----------
var, flg: instance
class variable and flag instances
"""
if flg.pp or flg.restart:
var.directory = var.logs if var.logs.endswith("/") else var.logs + "/"
else:
var.directory = "{}{}n{:g}_{:g}K_{}{:g}nm/".format(
var.logs, "" if var.logs.endswith("/") else "/",
var.nmax, var.temp,
'' if flg.suscep else '{:g}gauss_{:g}kHz_'.format(var.H_0 * 1e-6, var.nu * 1e-15),
var.radius[0] * 10)
# Make Log directory as needed
makedirs(var.directory, exist_ok=True)
# avoid overwriting other log files
try:
files = listdir(var.directory)
except FileNotFoundError:
c.Error("F Log directory does not exist")
# Get run number for filename
if not flg.pp and not flg.restart:
while any(file.startswith('{}Run{:g}'.format("S_" if flg.suscep else "",
run)) for file in files):
run += 1
flg.run = flg.run if flg.pp or flg.restart else run
check_name = '{}Run{:g}_mol-'.format("S_" if flg.suscep else "", flg.run)
if flg.pp and not flg.ufile:
from mango.pp.util import interactive_getfiles
flg, check_name = interactive_getfiles(flg, check_name, files)
# Add final name and save location to variables
var.name = "{}{}{:g}.".format(var.directory, check_name, var.no_molecules)
[docs]def verbosity(var, flg):
"""
Verbosity Changing.
Parameters
----------
var, flg: instance
class variable and flag instances
"""
c.header(flg.nout >= 2, flg.nout >= 4)
verb = (flg.nout >= 2)
flg.prog = (flg.nout >= 3 and c.tinfo['otty'])
c.Error("EV {} {} {} {}".format(verb, var.directory, flg.run, 1 if flg.pp else 0))
# progress bar switches
if flg.prog:
c.progress = start_server("Prog")
if not flg.pp: # (flg.column is not False or flg.suscep or flg.files['xyz']):
c.progress(f"setup {var.stats}")
[docs]def set_const(no_molecules):
c.tri = triu_indices(n=no_molecules, k=1)
c.set_accuracy(c.EPS2)
[docs]def restart_in(parsing):
"""
Get data from run to be restarted.
Parameters
----------
parsing: instance
argparse parsing instance
Returns
-------
variables: instance
class instance of all variables to be used
"""
filenames, save_type, directory, total = restartfile_read(parsing.restart)
restart, restartflags, written = get_restartdata(read_data(filenames[0], save_type), filenames)
restart.extra_iter = {f'stat{k}': total - writ - 1 for k, writ in written.items()}
restart.time = 0.0
restart.stats = list(written.keys())
if parsing.walltime is not None:
restart.walltime = parsing.walltime
for i in ['logs', 'directory']:
restart.__dict__[i] = directory
restartflags.opt = False
restartflags.restart = parsing.restart
set_const(restart.no_molecules)
return restart, restartflags
[docs]def column_manip(column_var, files):
"""
Create a list of plots to be made.
TODO cleanup
Parameters
----------
column_var: list
list parsed from user input
eg. ["time kinetic[5,:,3] + avtrans_pot / - avmag_pot[:100]", "time kinetic"]
files: dict
files to create
Returns
-------
plot_store: dict
plots to create
files: dict
files to create
"""
def search(key):
"""
Search for averaging of data.
Parameters
----------
key: str
key to find
Returns
-------
key: str
found key
inlist: bool
is key data to be averaged?
"""
inlist = False
if key == "time":
return [key, True]
for enum, i in enumerate(c.columns.keys()):
if key in c.columns[i] or key[2:] in c.columns[i]:
files[i] = True
inlist = True
# if not average_var[c.averages[enum]] or key[:2] == 'av':
# average_var[c.averages[enum]] = _parsingav(key[:2])
# key = key[2:] if average_var[c.averages[enum]] else key
return [key, inlist]
def rearray(j):
"""
Rearrayify array splitting.
Parameters
----------
j: str
array indexing like string eg 5,:
Returns
-------
j: str
proper array index eg [5,:]
"""
for ind in range(len(j) // 2):
j[(ind * 2) + 1] = "[" + j[(ind * 2) + 1] + "]"
return j
def shapedlist(j, cols_list_arr):
"""Shape array so its square also marker for 'missing' array."""
arr_j = array(j + [''] if len(j) % 2 == 1 else j).reshape(-1, 2)
# split elements at operations
for no, k in enumerate(arr_j[:, 0]):
cols_list_arr += [search(cl) for cl in filter(None, re.split('([-+/*])', k))]
if arr_j[no, 1] != '':
cols_list_arr[-1][1] = arr_j[no, 1]
cols_list_arr = []
cols_list = []
plot_no = 1
plot_store = {}
for i, j in enumerate(column_var):
# Plotting multiple graphs with multiple lines, split to new graph
# if j == ",":
# plot_store["plot_{}".format(plot_no)] = {"x": array(cols_list), "y": array(cols_list_arr)}
# col_check(plot_store["plot_{}".format(plot_no)])
# cols_list_arr = []
# cols_list = []
# plot_no += 1
# continue
j = re.split(r'[\[\]]', j.replace(" ", ""))
if len(j) > 1:
shapedlist(rearray(j), cols_list) if i == 0 else shapedlist(rearray(j), cols_list_arr)
else:
for k in j:
colum = re.split('([-+/*])', k)
for cl in colum:
if i == 0:
cols_list += [search(cl)]
else:
cols_list_arr += [search(cl)]
# if j != ",":
plot_store["plot_{}".format(plot_no)] = {"x": array(cols_list), "y": array(cols_list_arr)}
col_check(plot_store["plot_{}".format(plot_no)])
return plot_store, files
if __name__ == "__main__":
pass