import collections
import copy
import glob
import logging
import os
import dill
import matplotlib.pyplot as plt
import numba
import numpy as np
import pandas
from mewarpx.diags_store import diag_base, timeseries
from mewarpx.mwxrun import mwxrun
from mewarpx.utils_store import parallel_util
from mewarpx.utils_store.appendablearray import AppendableArray
import mewarpx.utils_store.util as mwxutil
from pywarpx import callbacks
logger = logging.getLogger(__name__)
[docs]class ParticleCSVDiag(diag_base.WarpXDiagnostic):
"""Handles writing out, and accumulating real-time, a per-step record of
charge injected or deposited. Provides a base for specific implementations.
At present species_name is a required column and must be str.
"""
# Columns not to accumulate internally
columns_no_accumulate = ['t', 'step', 'species_id']
def __init__(self, diag_steps, write_dir=None, save_name=None,
**kwargs):
"""Basic initialization.
If ``write_dir`` or ``save_name`` are ``None``, no data is saved to file
and only accumulated data is kept.
Arguments:
diag_steps (int): Number of steps between each output
write_dir (string): Directory to write CSV files to.
save_name(string): Full filename within the directory to write CSV
files to.
kwargs: See :class:`mewarpx.diags_store.diag_base.WarpXDiagnostic`
for more timing options.
"""
self.save = (write_dir is not None) and (save_name is not None)
if self.save:
self.write_dir = write_dir
self.save_path = os.path.join(self.write_dir, save_name)
self.accumulate_dict = {}
super(ParticleCSVDiag, self).__init__(
diag_steps=diag_steps, **kwargs)
def _charge_accum_diag(self):
"""Present only for backwards compatibility with runs that installed
this diagnostic.
"""
pass
[docs] def charge_accum_diag(self):
"""Generate flux dataframe; write to CSV if requested.
Called by the FluxDiag owning this at appropriate timesteps.
"""
df = self._collect_dataframe()
self._accumulate_columns(df)
if self.save:
self._write_dataframe(df)
return df
def _collect_dataframe(self):
"""Gather the dataframe of per-step information and return it."""
raise NotImplementedError("Must implement in child object.")
def _accumulate_columns(self, df, accumulate_dict=None):
"""Track summed quantities only
accumulate_dict not None will update that dict rather than the
built-in dict.
"""
if accumulate_dict is None:
accumulate_dict = self.accumulate_dict
species_list = np.unique(df['species_id'])
for column in df.columns:
if column not in self.columns_no_accumulate:
for species_id in species_list:
accumulate_dict[(column, species_id)] = (
accumulate_dict.setdefault((column, species_id), 0.)
+ np.sum(df[df['species_id'] == species_id][column])
)
[docs] def get_species_list(self):
"""Get sorted list of all species names recorded by this diagnostic."""
species_list = sorted(list(set(
[x for _, x in list(self.accumulate_dict.keys())]
)))
return species_list
def _write_dataframe(self, df):
"""Write the actual dataframe to file, append if possible."""
if mwxrun.me == 0:
# Write a header only if it's a new file
if os.path.exists(self.save_path):
df.to_csv(self.save_path, mode='a', header=False, index=False)
else:
df.to_csv(self.save_path, mode='w', header=True, index=False)
[docs] def get_updated_accumulators(self):
"""Return up-to-date accumulated values in a separate dictionary,
without clearing and/or writing the values.
"""
acdict = self.accumulate_dict.copy()
df = self._collect_dataframe(clear=False)
self._accumulate_columns(df, accumulate_dict=acdict)
return acdict
[docs]class InjectorFluxDiag(ParticleCSVDiag):
"""Handles writing out charge absorption by injectors."""
def __init__(self, diag_steps, injector, write_dir, **kwargs):
"""Generate and install function to write out charge accumulation.
Arguments:
diag_steps (int): Number of steps between each output
injector (emission.Injector): The particle injector stores
intermediate values.
write_dir (string): Directory to write CSV files to.
kwargs: See :class:`mewarpx.diags_store.diag_base.WarpXDiagnostic`
for more timing options.
"""
# Initialize variables
self.injector = injector
# Register with the injector, make sure we don't have two diagnostics
# attached, and initialize with fields.
if self.injector.injector_diag is not None:
raise RuntimeError("Cannot attach two InjectorFluxDiag objects "
"to the same injector.")
self.injector.injector_diag = self
self.injector.init_injectedparticles(self.injector.fields)
# Setup write-out capability
save_name = self.injector.name + '_injected.csv'
super(InjectorFluxDiag, self).__init__(
diag_steps=diag_steps,
write_dir=write_dir,
save_name=save_name,
**kwargs
)
def _collect_dataframe(self, clear=True):
partdict = self.injector.get_injectedparticles(clear=clear)
# Convert species_name & step to int, because they come out as floats.
partdict['species_id'] = partdict['species_id'].astype(int)
partdict['step'] = partdict['step'].astype(int)
df = pandas.DataFrame(partdict, columns=list(partdict.keys()))
return df
[docs]class SurfaceFluxDiag(ParticleCSVDiag):
"""Handles writing out for a single surface.
This class is used by FluxDiag; rarely used directly.
"""
# IF CHANGING THIS, CHANGE IN self._record_particle_flux() AS WELL.
fields = ['t', 'step', 'species_id', 'V_e', 'n', 'q', 'E_total']
def __init__(self, diag_steps, surface, write_dir, **kwargs):
"""Initialize surface-specific features.
Arguments:
diag_steps (int): Number of steps between each output
surface (mewarpx.Assembly object): The assembly object in which
particles are scraped.
write_dir (string): Directory to write CSV files to.
kwargs: See :class:`mewarpx.diags_store.diag_base.WarpXDiagnostic`
for more timing options.
"""
self.surface = surface
# Register with the surface, make sure we don't have two diagnostics
# attached, and initialize with fields.
if self.surface.scraper_diag is not None:
raise RuntimeError("Cannot attach two SurfaceFluxDiag objects "
"to the same surface.")
self.surface.scraper_diag = self
save_name = self.surface.name + '_scraped.csv'
super(SurfaceFluxDiag, self).__init__(
diag_steps=diag_steps,
write_dir=write_dir,
save_name=save_name,
**kwargs
)
# install diagnostic function to collect scraped particle data
self.surface.add_diag_fn(
diag_fn=self._record_particle_flux, attrib_list=["w", "E_total"]
)
# create the flux_dict to track fluxes
self.flux_array = AppendableArray(
typecode='d', unitshape=[len(self.fields)])
def _record_particle_flux(self, scraped_particle_dict):
"""Function to process the scraped particle dict for the total
particle and energy flux. The scraped particle array is cleared after
every step so it is known that all particles in the
scraped_particle_dict was scraped on this step.
Arguments:
scraped_particle_dict (dict): Dictionary containing scraped
particle data.
Note:
The total charge scraped and energy of the particles scraped
is multiplied by -1 since these quantities are leaving the system.
"""
# loop over species and process scraped particle data
for species in mwxrun.simulation.species:
data = np.zeros(7)
data[0] = mwxrun.get_t()
data[1] = mwxrun.get_it()
data[2] = species.species_number
data[3] = self.surface.getvoltage_e()
# When pre-seeding a simulation with plasma we inject particles over
# embedded boundaries which causes the first scraping step to
# show very large currents. For this reason we skip that first step
# but note that we inject after the first step so we need to skip
# scraping step 2.
if data[1] == 2:
self.flux_array.append(data)
continue
idx = np.where(scraped_particle_dict["species_id"] == data[2])
data[4] = np.size(idx)
data[5] = -species.sq * np.sum(scraped_particle_dict["w"][idx])
data[6] = -np.sum(scraped_particle_dict["E_total"][idx])
self.flux_array.append(data)
def _get_total_particle_flux(self, clear=False):
"""Get a dictionary containing the fluxes summed over processors.
Arguments:
clear (bool): If True, clear the particle data rows entered (field
names are still initialized as before). Default False.
Returns:
scrapedparticles_dict (collections.OrderedDict): Keys are the
originally passed field strings for lost particles. Values are
an (n)-shape numpy array for each field.
"""
lpdata = self.flux_array.data()
# Sum all except t/step/jsid/V_e from all processors
lpdata[:,4:] = parallel_util.parallelsum(np.array(lpdata[:,4:]))
lpdict = collections.OrderedDict(
[(fieldname, np.array(lpdata[:, ii], copy=True))
for ii, fieldname in enumerate(self.fields)])
if clear:
self.flux_array.cleardata()
return lpdict
def _collect_dataframe(self, clear=True):
partdict = self._get_total_particle_flux(clear=clear)
# Convert species_id & step to int, because they come out as floats.
partdict['species_id'] = partdict['species_id'].astype(int)
partdict['step'] = partdict['step'].astype(int)
df = pandas.DataFrame(partdict, columns=list(partdict.keys()))
return df
[docs]class FluxDiagBase(diag_base.WarpXDiagnostic):
"""Handle generic printing & plotting of charge injection and absorption.
This Base class should work with both postprocessing and during-run
analyses.
"""
default_printed_qtys = collections.OrderedDict([
('J', {'by_component': 'all',
'total': 'all',
'description': 'Current',
'units': 'A/cm^2'}),
('P', {'by_component': None,
'total': 'net',
'description': 'Power',
'units': 'W/cm^2'}),
('dQ', {'by_component': None,
'total': None,
'description': 'Heat transfer',
'units': 'W/cm^2'}),
('n', {'by_component': None,
'total': None,
'description': 'Macroparticle rate',
'units': 'particles per step'})
])
FLUX_DIAG_DIR = "fluxes"
def __init__(self, diag_steps, runinfo,
overwrite=True,
sig_figs=6,
printed_qtys=None,
fullhist_dict=None,
ts_dict=None,
**kwargs):
"""Generate and install function to write out fluxes.
Arguments:
diag_steps (int): Number of steps between each output
runinfo (:class:`mewarpx.runinfo.RunInfo`): RunInfo object is used
to get the species, injectors, surfaces, and system area.
overwrite (bool): If True the dill pickled save file will overwrite
the previous diagnostic period's saved file.
sig_figs (int): Number of significant figures in text output.
Default 6.
printed_qtys (dict): Override individual values of
default_printed_qtys; same input format but keys can be omitted
to use defaults.
fullhist_dict (dict): Dictionary of timeseries for the full run
ts_dict (dict): Dictionary of timeseries for the last 8 steps
kwargs: See :class:`mewarpx.diags_store.diag_base.WarpXDiagnostic`
for more timing options.
"""
# Save input variables
self.overwrite = overwrite
self.format_str = '{:.%dg}' % sig_figs
self.printed_qtys = copy.deepcopy(self.default_printed_qtys)
if printed_qtys is not None:
mwxutil.recursive_update(self.printed_qtys, printed_qtys)
self.component_dict = {}
for component in runinfo.component_list:
self.component_dict[component] = runinfo.component_labels.get(
component, component
)
self.species_list = [spec.name for spec in mwxrun.simulation.species]
self.fullhist_dict = fullhist_dict
if self.fullhist_dict is None:
self.fullhist_dict = collections.OrderedDict()
self.ts_dict = ts_dict
if self.ts_dict is None:
self.ts_dict = collections.OrderedDict()
self.write_dir = os.path.join(self.DIAG_DIR, self.FLUX_DIAG_DIR)
super(FluxDiagBase, self).__init__(
diag_steps=diag_steps,
**kwargs
)
[docs] def refresh_species(self, runinfo):
"""When species are updated, this can be used to re-synchronize an
existing FluxDiag with the new information. Will not update surfaces or
injectors, however.
Note:
In the future, it may be best to replace this with something that
more fully resets all references to runinfo, and/or just always
explicitly reference runinfo rather than getting information from
it at init. However, for now this is the quickest way to proceed.
Arguments:
runinfo (:class:`mewarpx.runinfo.RunInfo`): An updated RunInfo
object is solely used to get new species name information.
"""
self.species_list = [spec.name for spec in mwxrun.simulation.species]
[docs] def save(self, filepath=None):
"""Save only the critical variables to a pickle file. Since this can be
done often, a specific function is used to minimize file size.
Arguments:
filepath (str): If None, this is generated internally. Otherwise
save the pickle file to this path.
"""
if filepath is None:
if self.overwrite:
filename = "fluxdata.dpkl"
else:
filename = f"fluxdata_{mwxrun.get_it():010d}.dpkl"
filepath = os.path.join(self.write_dir, filename)
dict_to_save = {
'write_dir': self.write_dir,
'format_str': self.format_str,
'species_list': self.species_list,
'component_dict': self.component_dict,
'printed_qtys': self.printed_qtys,
'fullhist_dict': self.fullhist_dict,
'ts_dict': self.ts_dict,
}
with open(filepath, 'wb') as pfile:
dill.dump(dict_to_save, pfile)
[docs] def print_fluxes(self, tsdict):
"""Print the float results of calculations for the given dictionary of
timeseries.
Arguments:
tsdict (dict of (str, str, str)): First string is either 'inject' or
'scrape'; second string is the group name (eg cathode, accgrid);
third value is species_name for the species. Note any cropping
etc. should be done before this dict is passed in!
Returns:
resultstr (str): A string that can be printed for the performance.
"""
emit_full = timeseries.concat_crop_timeseries([
val for key, val in tsdict.items() if "inject" in key
])
collect_full = timeseries.concat_crop_timeseries([
val for key, val in tsdict.items() if "scrape" in key
])
resultstr = ""
for qty, flags in self.printed_qtys.items():
if flags['by_component']:
for component, componentstr in self.component_dict.items():
for species_name in self.species_list:
prefixstr = " ".join(
[componentstr, species_name.title(),
flags['description']]
)
emit_ts = tsdict.get(
('inject', component, species_name), None)
collect_ts = tsdict.get(
('scrape', component, species_name), None)
suffixstr = flags['units']
net_only = (flags['by_component'] == 'net')
resultstr += self.print_fluxset(
prefixstr=prefixstr, key=qty, suffixstr=suffixstr,
emit_ts=emit_ts, collect_ts=collect_ts,
net_only=net_only
)
if flags['total']:
prefixstr = " ".join(["Total", flags['description']])
suffixstr = flags['units']
net_only = (flags['total'] == 'net')
resultstr += self.print_fluxset(
prefixstr=prefixstr, key=qty, suffixstr=suffixstr,
emit_ts=emit_full, collect_ts=collect_full,
net_only=net_only
)
return resultstr
[docs] def print_fluxset(self, prefixstr, key, suffixstr, emit_ts=None,
collect_ts=None, net_only=False):
"""Print emitted and collected flux for specific timeseries and specific
flux quantity.
Arguments:
prefixstr (str): String that goes before numerical value. Should
include a label for the key.
key (str): Key for the quantity in the timeseries to print.
suffixstr (str): String after the numerical value, usually units.
emit_ts (:class:`mewarpx.diags_store.timeseries.Timeseries`):
Emitted flux timeseries.
collect_ts (:class:`mewarpx.diags_store.timeseries.Timeseries`):
Collected flux timeseries.
net_only (bool): Only print net value, not emitted and collected.
Returns:
resultstr (str): A string that can be printed for the performance.
"""
print_emission = (emit_ts is not None) and (not net_only)
print_collection = (collect_ts is not None) and (not net_only)
print_net = (
# Only print net and at least one ts is not None
(net_only and ((emit_ts is not None) or (collect_ts is not None)))
# Print net because have both emission and collection
or (print_emission and print_collection)
)
emit_value = 0.
if emit_ts is not None:
emit_value = emit_ts.get_averagevalue_by_key(key)
collect_value = 0.
if collect_ts is not None:
collect_value = collect_ts.get_averagevalue_by_key(key)
resultstr = ""
if print_emission:
resultstr += " ".join([
prefixstr,
"Emitted:",
self.format_str.format(emit_value),
suffixstr])
resultstr += "\n"
if print_collection:
resultstr += " ".join([
prefixstr,
"Collected:",
self.format_str.format(collect_value),
suffixstr])
resultstr += "\n"
if print_net:
resultstr += " ".join([
prefixstr,
"Net:",
self.format_str.format(emit_value + collect_value),
suffixstr])
resultstr += "\n"
return resultstr
[docs] def plot_fluxes(self, ts_dict, save=False):
"""
Arguments:
save (bool): If True, save and close figure. write_dir must be
defined. If False, leave figure open and return it.
"""
fig, axlist = plt.subplots(2, 2, figsize=(14, 8.5))
# List of axes properties
axlist = [x for y in axlist for x in y]
qty_list = [
{'key': 'J',
'ylabel': r'Current (A/$\mathrm{cm}^2$)',
'title': 'Simulation currents into system'},
{'key': 'P',
'ylabel': r'Power (W/$\mathrm{cm}^2$)',
'title': 'Component power production'},
{'key': 'dQ',
'ylabel': r'Heat (W/$\mathrm{cm}^2$)',
'title': 'Component heat transfer into system'},
{'key': 'n',
'ylabel': 'Macroparticles',
'title': 'Simulation particles handled'},
]
try:
label_list = [
'_'.join([
x[0].title(),
x[1].title(),
x[2].title()
])
for x in list(ts_dict.keys())
]
except KeyError:
logger.error(
"NOTE: KeyErrors in flux plotting can be caused by creating "
"species after RunInfo is saved (eg by initiating a "
"TraceParticleInjector after RunInfo). All species should be "
"initiated before RunInfo."
)
raise
if mwxrun.get_it() * mwxrun.get_dt() < 0.5e-6:
xlabel = 'Time (ns)'
else:
xlabel = r'Time ($\mu$s)'
for ax, qtydict in zip(axlist, qty_list):
typekey = qtydict['key']
timeseries.TimeseriesPlot(
array_list=[
(
label,
ts_dict[x].get_timeseries_by_key(typekey)
)
for (label, x) in
zip(label_list, list(ts_dict.keys()))
],
ax=ax,
xlabel=xlabel,
ylabel=qtydict['ylabel'],
title=qtydict['title'],
titlesize=18,
labelsize=16,
alpha=0.7,
legend=False
)
fig.legend(handles=ax.get_lines(), labels=label_list,
loc='lower center', fontsize=16,
frameon=True, ncol=3)
fig.tight_layout()
fig.subplots_adjust(bottom=0.13 + int((len(label_list)-1) / 3) * 0.04,
hspace=0.33)
if save:
fig.savefig(
os.path.join(
self.write_dir,
'flux_plots_{:010d}.png'.format(mwxrun.get_it())
), dpi=300
)
plt.close(fig)
return fig
[docs] def get_net_flux_timeseries(self, electrode_name, flux_type='J'):
"""Sum the emitted and absorbed flux across all species for a given
electrode, and return a timeseries array containing the net flux from
the electrode surface into the simulation volume.
Arguments:
electrode_name (str): Surface name such as 'cathode' or 'anode'.
Must be present in the RunInfo injector_dict and surface_dict in
order to be a valid key.
flux_type (str): Either 'J', 'P', 'dQ', or 'n'. Defaults to 'J'.
Returns:
flux_timeseries (np.ndarray): n x 2 array containing the time in
seconds in first column and the net flux from the electrode in
second column.
"""
flux_data = timeseries.concat_crop_timeseries([
val for key, val in self.fullhist_dict.items()
if electrode_name in key
])
if flux_data is None:
raise ValueError(
'No flux data found for "%s" electrode!' % electrode_name
)
return flux_data.get_timeseries_by_key(flux_type)
[docs]class FluxDiagnostic(FluxDiagBase):
"""Handles writing out charge injection and absorption."""
def __init__(self, diag_steps, runinfo, overwrite=True,
history_maxlen=5000, sig_figs=6,
printed_qtys=None,
check_charge_conservation=True,
print_per_diagnostic=True, print_total=False,
plot=True, save_csv=False, profile_decorator=None,
**kwargs):
"""Generate and install function to write out fluxes.
Arguments:
diag_steps (int): Number of steps between each output
runinfo (:class:`mewarpx.runinfo.RunInfo`): RunInfo object is used
to get the species, injectors, surfaces, and system area.
overwrite (bool): If True the dill pickled save file will overwrite
the previous diagnostic period's saved file.
history_maxlen (int): Maximum length of full history to keep. If
this is exceeded, history is resampled to a 2x lower frequency.
Default 5000.
sig_figs (int): Number of significant figures in text output.
Default 6.
printed_qtys (dict): Override individual values of
default_printed_qtys; same input format but keys can be omitted
to use defaults.
check_charge_conservation (bool): Whether to check if charge is
conserved in simulation.
print_per_diagnostic (bool): Whether to print current results for
the latest diagnostic period.
print_total (bool): Whether to print total history of fluxes after a
diagnostic period.
plot (bool): Whether to save a plot of fluxes after each diagnostic
period.
save_csv (bool): Whether to save csv files of scraped / injected
particles.
profile_decorator (decorator): A decorator used to profile the
timeseries update methods and related functions.
kwargs: See :class:`mewarpx.diags_store.diag_base.WarpXDiagnostic`
for more timing options.
"""
# Save input variables
self.runinfo = runinfo
self.history_maxlen = history_maxlen
self.history_dt = mwxrun.get_dt()
self.check_charge_conservation = check_charge_conservation
self.print_per_diagnostic = print_per_diagnostic
self.print_total = print_total
self.plot = plot
if save_csv:
csv_write_dir = os.path.join(self.DIAG_DIR, self.FLUX_DIAG_DIR)
else:
csv_write_dir = None
if profile_decorator is not None:
self.update_ts_dict = profile_decorator(self.update_ts_dict)
self.update_fullhist_dict = (
profile_decorator(self.update_fullhist_dict)
)
self.print_fluxes = profile_decorator(self.print_fluxes)
self.plot_fluxes = profile_decorator(self.plot_fluxes)
self.injector_dict = runinfo.injector_dict
self.surface_dict = runinfo.surface_dict
self.diags_dict = collections.OrderedDict()
for key, val in self.injector_dict.items():
self.injector_dict[key] = mwxutil.return_iterable(val)
self.diags_dict[('inject', key)] = [
InjectorFluxDiag(diag_steps=diag_steps,
injector=injector,
write_dir=csv_write_dir,
**kwargs)
for injector in self.injector_dict[key]
]
for key, val in self.surface_dict.items():
self.surface_dict[key] = mwxutil.return_iterable(val)
self.diags_dict[('scrape', key)] = [
SurfaceFluxDiag(diag_steps=diag_steps,
surface=surface,
write_dir=csv_write_dir,
**kwargs)
for surface in self.surface_dict[key]
]
# Initialize other variables
self.last_run_step = 0
super(FluxDiagnostic, self).__init__(
diag_steps=diag_steps,
runinfo=runinfo,
overwrite=overwrite,
sig_figs=sig_figs,
printed_qtys=printed_qtys,
**kwargs
)
self.check_scraping()
# if this run is from a restart, try to load flux data up to the
# current point
# mwxrun.restart isn't set until mwxrun.init_run is called, so the
# lambda function is needed here
callbacks.installafterinit(
lambda _=None:
self._load_checkpoint_flux() if mwxrun.restart else None
)
callbacks.installafterstep(self._flux_ana)
[docs] def check_scraping(self):
"""Checks that particles scraping at the appropriate boundaries is
turned on when flux diagnostics are on."""
for surf_name, surface in self.surface_dict.items():
if isinstance(surface, list):
if len(surface) > 1:
raise NotImplementedError("Distinct surfaces not supported")
surface = surface[0]
scrape_flag = (
f"save_particles_at_{surface.scraper_label.replace('_', '')}"
)
for sp in mwxrun.simulation.species:
if not getattr(sp, scrape_flag):
logger.warning(
f"{surface.scraper_label} scraping is turned off "
f"for {sp.name} but {surf_name} requires it."
)
def _flux_ana(self):
"""Perform the calculation and processing of current data from the
dataframes.
The Timeseries object is used to store multiple keys for one collection
of injectors and/or surfaces and one species. To facilitate that, we go
by diagnostic object, create a timeseries by species, and concatenate
timeseries by group of objects.
We also keep a from-the-beginning tally of similar timeseries, so we can
append the present timeseries to them, and then resample those if
needed.
"""
if self.check_timestep():
self.update_ts_dict()
if mwxrun.me == 0:
self.update_fullhist_dict()
if self.check_charge_conservation:
self._check_charge_conservation()
if self.print_per_diagnostic:
logger.info(
"\nTHIS DIAGNOSTIC PERIOD:\n"
+ self.print_fluxes(self.ts_dict)
)
if self.print_total:
logger.info(
"\nTOTAL HISTORY:\n"
+ self.print_fluxes(self.fullhist_dict)
)
if self.plot:
self.plot_fluxes(self.fullhist_dict, save=True)
self.save()
self.last_run_step = mwxrun.get_it()
[docs] def update_ts_dict(self):
"""Run early in flux analysis to get this diagnostic period's
timeseries.
"""
self.ts_dict = collections.OrderedDict()
for (keytype, key), diaglist in self.diags_dict.items():
for diagobj in diaglist:
df = diagobj.charge_accum_diag()
# only need to hold a copy of the timeseries on root
if mwxrun.me == 0:
species_list = diagobj.get_species_list()
for sp in species_list:
subdf = df[df['species_id'] == sp]
ts = FluxCalcDataframe(
df=subdf,
area=self.runinfo.area,
step_begin=self.last_run_step + 1,
step_end=mwxrun.get_it() + 1
)
sp_name = mwxrun.simulation.species[sp].name
if (keytype, key, sp_name) in self.ts_dict:
self.ts_dict[(keytype, key, sp_name)] = (
timeseries.concat_crop_timeseries(
[self.ts_dict[(keytype, key, sp_name)], ts]
)
)
else:
self.ts_dict[(keytype, key, sp_name)] = ts
[docs] def update_fullhist_dict(self):
"""Once current diagnostic period is updated, update full history and
resample if needed.
"""
maxlen = 0
# Build up strided, full-history timeseries.
# Note fullkey refers to tuples of (keytype, key, species_name)
for fullkey in self.ts_dict:
if fullkey in self.fullhist_dict:
self.fullhist_dict[fullkey] = (
timeseries.concat_crop_timeseries(
[self.fullhist_dict[fullkey],
self.ts_dict[fullkey]],
dt=self.history_dt
)
)
else:
# We force all full histories to start at timestep 0. Otherwise
# if eg anode absorption starts after first diagnostic period,
# we'll have different times in denominators of different
# values.
self.fullhist_dict[fullkey] = timeseries.concat_crop_timeseries(
[self.ts_dict[fullkey]], step_begin=0, dt=self.history_dt)
maxlen = max(
maxlen,
self.fullhist_dict[fullkey].step_end
- self.fullhist_dict[fullkey].step_begin
)
# Resize full history if we're above the maximum length - this
# resamples, including smoothing, to half the size. But only if the new
# dt of the fullhist_dict will be less than a diagnostic interval.
if maxlen > self.history_maxlen:
if self.history_dt * 4. < mwxrun.get_dt() * self.diag_steps:
self.history_dt *= 2.
for val in list(self.fullhist_dict.values()):
val.resample(new_dt=self.history_dt, inplace=True)
else:
self.history_maxlen = maxlen
def _check_charge_conservation(self):
"""Function to check net current flow into simulation during the last
diagnostic period.
"""
full_ts = timeseries.concat_crop_timeseries([
val for key, val in self.ts_dict.items()])
net_current = full_ts.get_averagevalue_by_key('J')
if abs(net_current) > 0.5:
if abs(net_current) > 1e3:
raise RuntimeError(
('Step %d: Net current exceeds 1000 A/cm^2, which is almost'
' definitely an error.') % mwxrun.get_it()
)
logger.warning(
f"Step {mwxrun.get_it()}: Net current ({net_current:.3f} "
"A/cm^2) exceeds 0.5 A/cm^2, which likely indicates a "
"violation of the CFL condition."
)
def _load_checkpoint_flux(self):
"""Function to load the flux data from a simulation checkpoint during
a restart. The 'old' flux data will be recorded in the fullhist_dict so
that the simulation output will contain the full flux history and not
just the flux timeseries after the current restart.
"""
# the current simulation step presumably matches the restart step
restart_step = mwxrun.get_it()
# get the flux diag file for the current step
flux_diag_file = os.path.join(
mwxrun.checkpoint_dir, mwxrun.checkpoint, "fluxdata.ckpt"
)
# throw an error if flux diag file does not exist
if not os.path.isfile(flux_diag_file):
raise RuntimeError(
f"{flux_diag_file} doesn't exist but is needed to restart."
)
logger.warning(
"Loading old flux data at restart assuming names of conductors and "
"species in the simulation have not changed"
)
# update the full history with the old history
old_fluxdiag = FluxDiagFromFile(fluxdatafile=flux_diag_file)
self.fullhist_dict = old_fluxdiag.fullhist_dict
self.last_run_step = restart_step
self.history_dt = list(self.fullhist_dict.values())[0].dt
[docs]class FluxCalcDataframe(timeseries.Timeseries):
"""A FluxCalc created from a pandas Dataframe.
Dataframes passed to this class should only consist of a single species; as
a result multiple lines at the same timestep are not supported. Desired
concatenation or summation across timeseries should be done on the
processed timeseries, not on the raw dataframes.
**INSERT TEXT ABOUT EXTRA COLUMNS HERE -- ESPECIALLY IMPLICATIONS FOR WHICH
QUANTITIES MAKE SENSE BECAUSE THE NORMALIZATION BY WEIGHT MAKES SENSE FOR
THEM**
All calculations are done with reference to ground (V_e=0).
"""
def __init__(self, df, area, step_begin=None, step_end=None, dt=None):
"""Initialize the FluxCalc object, transforming df into the appropriate
array.
Arguments:
df (pandas.DataFrame): A single dataframe with columns:
- t: Time in sec
- step: Step number, integer, with constant dt necessary.
- n: Number of macroparticles
- q: Injected/deposited charge
- E_part: Total KE + PE of the particles, relative to phi = 0,
assuming potential energy = q*phi. For scraped data, this has
already been multiplied by -1 in the scraper to denote energy
leaving the system.
- V_e: Fermi level of the emitting or collecting surface.
- Optional extra columns
area (float): Area to consider for calculating current densities,
usually Lx*Ly from the simulation, in m^2. Values are
transformed to /cm^2 internally.
step_begin (int): Beginning timestep to consider. If not provided,
take min step in df. Must be <= min step in df.
step_end (int): Last timestep +1 to consider. If not provided,
take max step in df +1. Must be > max step in df.
dt (float): Timestep increment. If not provided, use warp.top.dt.
Must provide in post-processing.
"""
self.area = area
self.fac = 1.0 / (1e4 * self.area)
if step_begin is None:
step_begin = df['step'].min()
if step_end is None:
step_end = df['step'].max() + 1
if dt is None:
dt = mwxrun.get_dt()
special_keys = ['t', 'species_id', 'step', 'n', 'q', 'E_total', 'V_e']
# we need to get the sum of collected weight for each species in order
# to properly average extra arrays by the total number of particles
spid_array = np.array((df['species_id']))
sq_array = np.array([spec.sq for spec in mwxrun.simulation.species])
q_array = np.array(df['q'])
weight_sum = np.zeros_like(spid_array)
idx = np.where(q_array != 0.0)
weight_sum[idx] = np.abs(
q_array[idx]
/ sq_array[spid_array[idx]]
)
idx = np.where(weight_sum == 0.0)
weight_sum[idx] = np.array(df['n'])[idx]
extra_keys = ['weight_sum']
extra_arrays = [weight_sum]
for key in df.keys():
if key not in special_keys:
extra_keys.append(key)
extra_arrays.append(np.array(df[key]))
num_extra_arrays = len(extra_arrays)
# this is necessary or else numba doesn't play nice
if num_extra_arrays == 0:
extra_arrays = np.zeros((1, 1))
else:
extra_arrays = np.array(extra_arrays)
timeseries_array = gen_timeseries(
step_begin=step_begin, step_end=step_end,
dt=dt,
step_array=np.array(df['step'], dtype=int),
n_array=np.array(df['n']),
q_array=self.fac*q_array,
E_array=self.fac*np.array(df['E_total']),
V_e_array=np.array(df['V_e']),
extra_arrays=extra_arrays,
num_extra_arrays=num_extra_arrays
)
array_dict = {
'n': timeseries_array[:, 0],
'J': timeseries_array[:, 1],
'dQ': timeseries_array[:, 2],
'P': timeseries_array[:, 3],
}
for ii, key in enumerate(extra_keys):
array_dict[key] = timeseries_array[:, 4+ii]
super(FluxCalcDataframe, self).__init__(
step_begin=step_begin, step_end=step_end, dt=dt,
array_dict=array_dict
)
[docs]class FluxDiagFromFile(FluxDiagBase):
"""Load a flux diag from its minimal saved file.
This is meant for postprocessing ONLY; it will not initialize things
properly for during-run operations.
"""
def __init__(self, basedir='diags', fluxdatafile=None,
fluxdatafileformat='fluxes/fluxdata*', fs=None):
"""Load from fluxdata_XXXXXXXXXX.dpkl files.
Arguments:
basedir (str): Base directory of the diagnostic files.
fluxdatafile (str): Filename dill-pickled with FluxDiagBase.save().
fluxdatafileformat (str): If a specific fluxdatafile is not
specified, this naming format will be used to search for the
latest fluxdata file in the base directory.
fs (s3fs filesystem): Optional S3 filesystem to load data directly
from a S3 bucket.
"""
if fs is None:
self.open_command = open
self.exists_command = os.path.exists
self.glob_command = glob.glob
else:
self.open_command = fs.open
self.exists_command = fs.exists
self.glob_command = fs.glob
self.fluxdatafileformat = fluxdatafileformat
if fluxdatafile is None:
fluxdatafile = self._get_fluxdatafile(basedir)
with self.open_command(fluxdatafile, 'rb') as pfile:
dict_to_load = dill.load(pfile)
self.__dict__.update(dict_to_load)
def _get_fluxdatafile(self, basedir):
"""Function to look for latest fluxdata file from the base directory.
"""
flux_files = sorted(
self.glob_command(os.path.join(basedir, self.fluxdatafileformat))
)
if len(flux_files) == 0:
raise IOError(
f'No fluxdata files found in base directory: {basedir}'
)
return flux_files[-1]
# ### Numba functions for FluxCalcDataframe ###
[docs]@numba.jit(nopython=True)
def gen_timeseries(step_begin, step_end, dt, step_array, n_array,
q_array, E_array, V_e_array, extra_arrays, num_extra_arrays):
"""Transform dataframe entries, with potentially sparse rows (eg some
timesteps have no associated row), into an array capturing all timesteps
between step_begin and step_end. Multiple rows for the same timestep are
not supported.
Note:
Signs are somewhat confusing here! For particles that don't have
scattering or ionization, E_array is essentially KE_emit + q*V_emit. dQ
for emission is then KE_emit + q*V_emit - q*V_e, but V_emit = V_e - WF,
so dQ = KE_emit - q*WF. Since q is negative for electrons, we're adding
|q|*WF potential energy into simulation, which does correctly represent
surface cooling due to electron evaporation.
dQ for absorption -- the arguments passed in, E_array and q_array, are
already both negated to represent absorption vs injection. With
standard physical signs for q, Q, KE, etc., then,
dQ = -(KE_emit + q*V_cathode) + q*V_e_anode =
-(KE_emit + q(V_cathode - (V_anode + WF))), where V_cathode and V_anode
are vacuum biases. The front minus sign indicates absorption of this
heat onto the surface/out of the system. q*(V_cathode - V_anode)
captures kinetic energy from accelerating the particle across the gap.
-q*WF is the heat deposited from the electron falling back to the Fermi
level. So it should all work, and tests confirm energy conservation --
see test_diags_fluxdiag.py.
Arguments:
step_begin (int): First step to consider
step_end (int): n + 1th step to consider (eg python indexing)
dt (float): Timestep in seconds, necessary for normalization
step_array (np.ndarray): m-length array of the step number for each row.
All entries in step_array must be integers >= step_begin and <
step_end. m is the number of rows in the original dataframe.
n_array (np.ndarray): m-length array of the macroparticle count for each
row.
q_array (np.ndarray): m-length array of the charge (in C) for each row.
E_array (np.ndarray): m-length array of the KE + PE for each row. For
scraped data, this has already been multiplied by -1 in the scraper
to denote energy leaving the system.
V_e_array (np.ndarray): m-length array of the Fermi level voltage for
each row.
extra_arrays (np.ndarrays): m-length arrays that are averaged into a
timeseries.
num_extra_arrays (int): Number of extra arrays included.
Returns:
timeseries_array (np.ndarray): (step_end - step_begin)x(4
+ len(extra_arrays)) array with n, J, dQ, P at each timestep. Extra
arrays are also made into a timeseries.
Q=(E_part - q*V_e)/dt, and P=-(q*V_e)/dt (eg electrical power). Signs
mean we get heat INTO system, and "standard" power production. (Note q
is signed like Q: positive into system / negative out of system. That
means electrons leaving the system give positive q.)
"""
timeseries_array = np.zeros((step_end - step_begin, 4 + num_extra_arrays))
for idx in range(len(step_array)):
i_step = step_array[idx] - step_begin
timeseries_array[i_step, 0] = n_array[idx]
timeseries_array[i_step, 1] = q_array[idx]/dt
timeseries_array[i_step, 2] = (
E_array[idx] - q_array[idx]*V_e_array[idx]
) / dt
timeseries_array[i_step, 3] = -q_array[idx]*V_e_array[idx]/dt
for ii in range(0, num_extra_arrays):
timeseries_array[i_step, 4+ii] = extra_arrays[ii][idx]
return timeseries_array