Source code for mango.pp.acf

from numpy import (arange, array, conjugate, where, block,
                   mean, amax, zeros_like, zeros, var, sum as nsum, linalg,
                   sqrt, savetxt, einsum, exp, log, log2, diag)
from scipy.optimize import fminbound, curve_fit
from scipy.interpolate import interp1d
from math import pi
from os import path
from contextlib import suppress

from mango.constants import c, nestedDict, asciistring
from mango.pp.util import at_least2d_end # debye,
import mango.imports as imports

from mango.debug import debug

# from numpy import seterr

# seterr(all='raise')


[docs]def get_suscep(flg, skip, dt, sus_data, blocks, stats, mass, vol, ms): """ Calculate the ACF of data for different properties. Parameters ---------- Returns ------- atc_data: dict A dictionary of all the calculated ACF """ autocorr_labels = {"mag": {"x": r'$\omega$ [$10^{{{}}}$]', "y": r"MDOS [$10^{{{}}}$]", "type": "MDOS"}, "vel": {"x": r'$\omega$ [$10^{{{}}}$]', "y": r"VDOS [$10^{{{}}}$]", "type": "VDOS"}, "angular": {"x": r'$\omega$ [$10^{{{}}}$]', "y": r"ADOS [$10^{{{}}}$]", "type": "ADOS"}, "inertia": {"x": r'$\omega$ [$10^{{{}}}$]', "y": r"IDOS [$10^{{{}}}$]", "type": "IDOS"}, "rotation": {"x": r'$\omega$ [$10^{{{}}}$]', "y": r"RDOS [$10^{{{}}}$]", "type": "RDOS"}, "mag_ifft": {"x": r"Time [$10^{{{}}}$s]", "y": r"MACF [$10^{{{}}}$]", "type": "MACF"}, "vel_ifft": {"x": r"Time [$10^{{{}}}$s]", "y": r"VACF [$10^{{{}}}$]", "type": "VACF"}, "angular_ifft": {"x": r"Time [$10^{{{}}}$s]", "y": r"AACF [$10^{{{}}}$]", "type": "AACF"}, "inertia_ifft": {"x": r"Time [$10^{{{}}}$s]", "y": r"IACF [$10^{{{}}}$]", "type": "IACF"}, "rotation_ifft": {"x": r"Time [$10^{{{}}}$s]", "y": r"RACF [$10^{{{}}}$]", "type": "RACF"}} if 'vel' in flg.suscep: sus_data["vel"] = einsum("hijk, j -> hijk", sus_data["mom"], 1 / mass) atc_data = nestedDict() autocorr = Autocorr(flg, sus_data, dt, skip, ms * vol) autocorr.conv(len(stats)) for bl in blocks: autocorr.analysis(blocks=bl) for i in flg.suscep: atc_data[bl][i] = {"xaxis": autocorr.freq, "yaxis": autocorr.ret[i]['data'], "stddev_y": autocorr.ret[i]['error'], # "popt": tau, "popt_fitted": ad_tau.popt_fit, "autocorr_labels": autocorr_labels[i], 'scale': autocorr.ret[i]['scale']} iatc = i + '_ifft' atc_data[bl][iatc] = {"xaxis": autocorr.time, "yaxis": autocorr.ret[i]['ifft'], "stddev_y": autocorr.ret[i]['ifft_error'], # "popt": tau, "popt_fitted": ad_tau.popt_fit, "autocorr_labels": autocorr_labels[iatc], 'scale': autocorr.ret[i]['scale']} return atc_data
[docs]class Autocorr(): """Calculates the ACF and MDOS of provided data.""" @debug(['acf']) def __init__(self, flg, sus_data, dt, skip, mmag): """ Store data to be processed. Parameters ---------- flg: instance flags class instance sus_data: dict data dictionary dt: float timestep skip: int number of iterations skipped mmag: float Mmag """ self.flg = flg self.dt = dt self.skip = skip self.mmag = mmag self.fft, self.ffts, self.ifft, self.byte_align = imports.fftw() self.shape = sus_data[list(sus_data.keys())[0]].shape self.ret = {} self.sus_collect = {} self._shape_data(sus_data) if hasattr(self, 'DBG'): self.sus_data = sus_data.copy() def _calc(self, func, *args): # Paralisable loop c.processors/numkeys for autocorr_type in self.sus_collect.keys(): func(autocorr_type, *args) if hasattr(self, 'DBG'): from mango.pp.acf_checker import checker, autoold ret2, freq2, time2 = autoold(c.Error, self.sus_data, self.dt, self.skip, self.mmag, blocks=self.blocks) checker(self.ret, ret2, self.freq, freq2, self.time, time2)
[docs] def conv(self, no_stats, RMS=True): """Carry out blocking convergence tests.""" self._calc(self._blockingconvergence, no_stats, RMS)
[docs] def analysis(self, blocks): """ Carry out DOS and ACF calculation for provided data and blocks. Parameters ---------- blocks: int number of blocks """ self.sample = self.shape[0] * self.shape[1] // blocks self.sample_pad = self.ffts.next_fast_len(2 * self.sample - 1) self.freq = 2.0 * pi * self.ffts.fftshift(self.ffts.fftfreq(self.sample_pad, d=(self.dt * self.skip))) self.norm_val = self.freq[1] - self.freq[0] # arange(-self.sample + 1, self.sample) * self.dt * self.skip self.time = 2. * pi * self.ffts.fftshift(self.ffts.fftfreq(self.sample_pad, d=self.norm_val)) self.blocks = blocks self._calc(self._blockinganalysis)
def _shape_data(self, sus_data): """ Shape arrays dependent on type. Parameters ---------- sus_data: dict dict of data arrays """ reshape = [] for autocorr_type in self.flg.suscep: # if autocorr_type in ['chi']: # continue # else: value = sus_data[autocorr_type] if autocorr_type in ['mag']: # Sum over the number of atoms # Flatten and refold -> Blocking will only trim from # first trajetory for correct dimensions # data = # stddev = self._get_av_err(data, av=False) # - (stddev if self.flg.align else 0) # - (mean(einsum("il, il ->il", data[:, 0], data[:, 0]), axis=1) if self.flg.align else 0 data = self._remove_av(einsum('ijkl->ijl', value)).reshape(-1, 3) elif autocorr_type in ["rotation"]: if self.flg.align: data = self._remove_av(einsum('ijkl, il -> ijk', value, mean(einsum("ijkz ->ijz", sus_data["mag"]), axis=1))).reshape(-1, 3) else: c.Error("W rotation autocorrelation only works with align=True.") else: data = self._remove_av(value).reshape([-1, *value.shape[2:]]) if autocorr_type in ['inertia', "vel"]: reshape = [-1, 3] else: reshape = [3] print(autocorr_type, data.shape) e_string = "{0}, {0} -> ".format(''.join(asciistring(data.ndim))) self.sus_collect[autocorr_type] = {'data': data, 'scale': 2 * pi * einsum(e_string, data, data) / data.size, 'reshapeto': reshape} @staticmethod def _remove_av(value): """ Remove the average over the iterations before any modifcation. Trajectory may not be fully relaxed Removing the mean after blocking leads to spurious errors """ return value - mean(value, axis=1)[:, None] def _fft_av_err(self, dummy, scale, blocks): """Scale fft errors.""" ave, err = self._get_av_err(dummy, blocks) print(ave[0]) return scale * self.ffts.fftshift(ave), scale * self.ffts.fftshift(err) @staticmethod def _get_av_err(dummy, blocks=1, axis=0, av=True): """ Average data over blocks and calculate errors. Parameters ---------- dummy: array data array blocks: int number of blocks Returns ------- ave: array averaged data array err: array standard deviation of averaged data """ if dummy.shape[0] == 1: err = zeros(dummy.shape[1:]) else: err = sqrt(var(dummy, ddof=1, axis=axis) / blocks) return (mean(dummy, axis=axis), err) if av else err def _blockingconvergence(self, autocorr_type, no_stats, RMS=True): """ Convergence test for blocking. Parameters ---------- autocorr_type: str autocorr name RMS: bool Root mean squared or mean """ print('# Block averages {}'.format('magnetic moment [emu]' if autocorr_type == 'mag' else autocorr_type)) print('# Block, sample, average, error') data = self.sus_collect[autocorr_type]['data'] if RMS: def RMSwrap(data, **kw): e_string = "{0}, {0} -> ab".format(''.join(asciistring(data.ndim))) return sqrt(mean(einsum(e_string, data, data), **kw)) else: def RMSwrap(data, **kw): e_string = "{} -> ab".format(''.join(asciistring(data.ndim))) return mean(einsum(e_string, data), **kw) for local_sample in (no_stats * 2**x for x in range(int(log2(self.shape[0] * self.shape[1]) - log2(self.shape[0])))): no_blocks = self.shape[0] * self.shape[1] // local_sample blocked = data[:local_sample * no_blocks].reshape([no_blocks, local_sample, *self.sus_collect[autocorr_type]['reshapeto']]) reblock = RMSwrap(blocked, axis=1) ave, err = self._get_av_err(reblock, no_blocks) print(local_sample, no_blocks, ave, err) # if local_sample > 1 else 0) @staticmethod def _get_estring(ndim): """ Return the 3 character strings for einsum of DOS. eg 3 dimensions would return: dos = 'abc' cdos = 'abd' fstr = 'abcd' Parameters ---------- ndim: int number of dimensions of array Returns ------- dos_str: str string of char length ndim cdos_str: str string of char length ndim fstr: str string of char length of ndim + 1 """ chr_list = asciistring(ndim + 1) dos_str = ''.join(chr_list[:-1]) cdos_str = ''.join(chr_list[:-2] + [chr_list[-1]]) return dos_str, cdos_str, ''.join(chr_list) @staticmethod def _pmsum(data): # TODO per particle correleation # if shape[1] == no_mol and shape[0] == iter (?) # if data.ndim == 3: # return einsum("ijk -> ik", data) return data def _blockinganalysis(self, autocorr_type): """ DOS and ACF calculation. creates storage dictionary 'ret' Parameters ---------- autocorr_type: str autocorr name """ scale = self.sus_collect[autocorr_type]['scale'] reshapeto = [self.blocks, self.sample, *self.sus_collect[autocorr_type]['reshapeto']] # Scale # if autocorr_type is 'mag': # 2*pi*<M*M>/3 # # scale = 2. * pi * einsum("i, i -> ", self.mmag, self.mmag) / (3. * self.mmag.size) # scale = einsum("ijk, ijk -> ", mxyz, mxyz) / mxyz.size # This is equal to <M*M>/3 for independent particles # else: # 2*pi*<v*v>/3, 2*pi*<M*M>/3 # Blocking print(autocorr_type, self.sus_collect[autocorr_type]['data'].shape) mxyz = self.sus_collect[autocorr_type]['data'][:self.sample * self.blocks].reshape(reshapeto) self.ret[autocorr_type] = {'scale': scale} print('DOS') # DOS dos = self.fft(self.byte_align(mxyz), n=self.sample_pad, axis=1, threads=c.processors) print("DOne") # modulus of the data gives separate real and imaginary parts dos = self._pmsum(dos) dos_str, cdos_str, f_str = self._get_estring(dos.ndim) mod_dos = einsum("{}, {} -> {}".format(dos_str, cdos_str, f_str), dos, conjugate(dos)) # Calculate eigenvalues eig_dos = linalg.eigvalsh(mod_dos) # Normalisation: Integral of DOS equal 1 norm = self.norm_val * einsum('{} -> '.format(dos_str), eig_dos).real / self.blocks eig_dos /= norm print('IDOS') # Normalisation: ACF(0) = 1 ifftx = 2 * pi * self.ifft(self.byte_align(eig_dos), axis=1, threads=c.processors) / (self.dt * self.skip) print("DOne") # Error and averaging (self.ret[autocorr_type]['data'], self.ret[autocorr_type]['error']) = self._fft_av_err(eig_dos, scale, self.blocks) (self.ret[autocorr_type]['ifft'], self.ret[autocorr_type]['ifft_error']) = self._fft_av_err(ifftx, scale, self.blocks)
[docs]@debug(['save']) def save_suscep(flg, directory, run, no_mol, cblock, xaxis, yaxis, stddev_y, autocorr_labels, scale, line_d={}, **kwargs): """ Save the susceptibility calculation. Saves autocorrelation data in raw text, The columns are shown below with yaxis and std_y will always be 3 columns: xaxis yaxis... std_y... Parameters ---------- directory: str save location run: int run number no_mol: int number of particles cblock: int current block length xaxis: ndarray x data yaxis: ndarray y data stddev_y: ndarray y error data autocorr_labels: dict dictionary of x and y labels scale: float scale value for data """ # if (path.isfile(figname) or path.isfile("{}{}.dat".format(figname, "ALIGN" if flg.align else ''))) and flg.saveg: # c.Error('W File {} exists, skipping'.format(figname.rsplit('/', 1)[-1])) # return None xaxis, yaxis, stddev_y = at_least2d_end(xaxis, yaxis, stddev_y) if ('DOS' in autocorr_labels["type"]) or ('ACF' in autocorr_labels["type"]): header = f'scale factor: {scale}' # Remove 2 pi which is from Fourier transform definintion data = block([xaxis, yaxis, stddev_y]) print(data.shape, xaxis.shape, yaxis.shape, stddev_y.shape) name = "{}S_Run{}_mol-{:g}_autocorr_{}_blk{}".format(directory, run, no_mol, autocorr_labels["type"], cblock) savetxt("{}{}.dat".format(name, "ALIGN" if flg.align else ''), data, header=header)