"""Base diagnostic code used in many diagnostics."""
import logging
import sys
import time
import numpy as np
from mewarpx.mwxrun import mwxrun
import mewarpx.utils_store.util as mwxutil
from pywarpx import callbacks
# import psutil
# Get module-level logger
logger = logging.getLogger(__name__)
[docs]class WarpXDiagnostic(object):
"""Hold attributes common to all diagnostics, especially logic on when to
run.
"""
DIAG_DIR = "diags"
def __init__(self, diag_steps, diag_step_offset=0,
extended_interval_level=None,
manual_timesteps=None,
):
"""Initialize timing. Will often be called within a child object's
init.
Arguments:
diag_steps (int): Run the diagnostic with this period.
diag_step_offset (int): Run when ``simulation_step % diag_steps =
diag_step_offset``. Default 0.
extended_interval_level (int): Enable an exponentially-increasing
interval as the run progresses. If None (default), calculate
every time period. Otherwise, the interval increases on an
exponential scale. 0 is most aggressive at skipping diagnostic
timesteps; 1, 2, etc. are progressively less aggressive but
still exponential. Specifically, 0 executes once during each
power-of-2 set of diagnostic steps; 1 twice; 2 four times, etc.
manual_timesteps (list of ints): A list of timesteps to execute on
within each diag_steps. If this list contains steps for the
entire run, set diag_steps to a very large number. Default
None.
"""
self.diag_steps = int(round(diag_steps))
self.diag_step_offset = int(round(diag_step_offset))
self.extended_interval_level = extended_interval_level
if self.extended_interval_level is not None:
self.extended_interval_level = int(round(
self.extended_interval_level))
self.manual_timesteps = manual_timesteps
if self.manual_timesteps is not None:
self.manual_timesteps = [int(round(x))
for x in self.manual_timesteps]
if (
(self.manual_timesteps is not None)
and (self.extended_interval_level is not None)
):
raise ValueError("manual_timesteps and extended_interval_level "
"cannot be used together; one must be None.")
if mwxrun.initialized:
raise RuntimeError("You must install all diagnostics before "
"initializing warpx")
# handle creating the folder to save diagnostics
callbacks.installafterinit(self._create_diag_folder)
def _create_diag_folder(self):
"""Helper function to create folder in which to save diagnostics. Child
classes that save diagnostic files should have a ``write_dir``
attribute specifying the directory in which it will save diagnostic
files."""
if hasattr(self, "write_dir") and mwxrun.me == 0:
mwxutil.mkdir_p(self.write_dir)
[docs] def check_timestep(self):
"""Check if the diagnostic should run on this timestep.
Returns:
l_execute (bool): If True, run on this timestep. If False, don't.
"""
it = mwxrun.get_it()
if self.manual_timesteps is not None:
return (it % self.diag_steps) in self.manual_timesteps
if (it % self.diag_steps) == self.diag_step_offset:
return (
(self.extended_interval_level is None)
or self._comp_calcinterval(it)
)
return False
def _comp_calcinterval(self, it):
"""Decide whether to calculate current based on exponential scaling
algorithm.
The algorithm forces the number of diagnostic period to be divisible by
some power of 2 below it. If calcinterval_level = 0, it must be a power
of 2 (e.g. for diagnum between 64 and 127, it must be divisible by 64
=> 64 only). For each increase in calcinterval_level, relax the
division by one power of 2. For example, if calcinterval_level = 3 and
diagnum is between 128 and 255, diagnum must be divisible by 128/2^3 =
16.
Also sets self.stride_multiplier.
Arguments:
it (int): The current integer step number
"""
diagnum = it // self.diag_steps
diagnum_log2 = np.floor(np.log2(diagnum))
diagnum_divisor = int(max(
2**(diagnum_log2 - self.extended_interval_level), 1))
self.stride_multiplier = diagnum_divisor
if diagnum % diagnum_divisor == 0:
return True
else:
next_diagnum = ((diagnum // diagnum_divisor) + 1)*diagnum_divisor
next_itnum = next_diagnum * self.diag_steps + self.diag_step_offset
logger.info(
f"Waiting until diagnostic period {next_diagnum} (step {next_itnum}) to run "
f"diagnostic due to extended_interval_level = {self.extended_interval_level}"
)
return False
[docs]class TextDiag(WarpXDiagnostic):
"""Output diagnostics every certain number of steps.
Contains:
text_diag (function): Function to do the write-out. Use only if needed
for a future reference in script.
"""
def __init__(self, diag_steps, preset_string='default',
custom_string=None, install=True, **kwargs):
"""Generate and install function to write out step #.
Arguments:
diag_steps (int): Number of steps between each output
simulation (mespecies.Simulation): Main simulation object
preset_string (str): Defaults to choose between:
- ``default`` - just the step number and total particle num
- ``perfdebug`` - like particledebug, plus interval wall time,
step rate, and particle-step rate
- ``memdebug`` - print out verbose memory usage information
custom_string (str): Overrides preset_string if not None. The full
string to output, with:
- ``{step}`` formatting symbol for where the step number should
go
- ``{wall_time}`` run time of the last diag_steps steps
- ``{step_rate}`` diag_steps / wall_time
- ``{particle_step_rate}`` nplive * diag_steps / wall_time
- ``{nplive}`` for number of live particles (global).
- ``{npperspecies}`` for number of particles per species
(global).
- ``{iproc}`` for the current processor number
- ``{system_memory}`` for verbose information on system memory
usage.
- ``{memory_usage}`` for memory usage of the current process
only.
install (bool): If False, don't actually install this into WarpX.
Use if you want to call manually for debugging.
kwargs: See :class:`mewarpx.mewarpx.diags_store.diag_base.WarpXDiagnostic`
for more timing options.
"""
self.defaults_dict = {
'default': "Step #{step:6d}; {nplive:8d} particles",
'perfdebug': ("Step #{step:6d}; {nplive:8d} particles "
"{npperspecies} "
"{wall_time:6.1f} s wall time; "
"{step_rate:4.2f} steps/s; "
"{particle_step_rate:4.2f} particle*steps/s in the last {diag_steps} steps; "
"{particle_step_rate_total:4.2f} particle*steps/s overall"),
'memdebug': ("{system_memory}\n"
"Proc {iproc} usage:\n{memory_usage}"),
}
if custom_string is not None:
self.diag_string = custom_string
else:
if preset_string not in self.defaults_dict:
logger.warning(("Preset {} not found for set_step_diag, "
"using default").format(preset_string))
preset_string = 'default'
self.diag_string = self.defaults_dict[preset_string]
super(TextDiag, self).__init__(diag_steps=diag_steps, **kwargs)
callbacks.installafterinit(self.init_timers_and_counters)
if install:
callbacks.installafterstep(self.text_diag)
[docs] def init_timers_and_counters(self):
"""Start timers."""
# In warp we used a specific walltime counter it had
# (warp.top.steptime). Not sure what issues we'll hit just using time
# here.
self.prev_time = time.time()
self.start_time = self.prev_time
self.prev_step = mwxrun.get_it()
self.particle_steps_total = 0
self.previous_particle_steps_total = 0
[docs] def text_diag(self):
"""Write requested information to output."""
if self.check_timestep():
live_parts, parts_per_species_str = self._get_part_nums()
# Loop everything else so run doesn't crash on diag error
try:
wall_time = time.time() - self.prev_time
steps = mwxrun.get_it() - self.prev_step
# This isn't perfectly accurate, but it's a good approximation
self.particle_steps_total += live_parts * steps
if wall_time > 0:
particle_step_rate = (
self.particle_steps_total
- self.previous_particle_steps_total
) / wall_time
step_rate = steps / wall_time
else:
step_rate = 0
particle_step_rate = 0
total_elapsed_time = time.time() - self.start_time
particle_step_rate_total = self.particle_steps_total / total_elapsed_time
self.status_dict = {
'step': mwxrun.get_it(),
'nplive': live_parts,
'npperspecies': parts_per_species_str,
'wall_time': wall_time,
'step_rate': step_rate,
'particle_step_rate': particle_step_rate,
"diag_steps": self.diag_steps,
"particle_step_rate_total": particle_step_rate_total,
# TODO: Reimplement when we have new parallel comm hook.
# 'iproc': mwxutil.iproc,
'iproc': None,
}
# Iff memory usage is requested, compute it.
if (("system_memory" in self.diag_string)
or ("memory_usage" in self.diag_string)):
self.update_memory()
# Support child objects by having arbitrary updates
self._update_status_dict()
# Print the string with everything that ended up in the
# dictionary.
logger.info(self.diag_string.format(**self.status_dict))
# Flush everything here. At least running through slurm, we've
# sometimes seen printouts take hours to flush. While it's
# pretty arbitrary where this is in mewarpx, TextDiag will
# almost always want it, so I think this makes sense.
sys.stdout.flush()
self.previous_particle_steps_total = self.particle_steps_total
except Exception as err:
logger.error(
f"Failed to output diag_string {self.diag_string} "
f"with error {err}"
)
self.prev_time = time.time()
self.prev_step = mwxrun.get_it()
def _update_status_dict(self):
"""Child objects can override this to make additional modifications to
status dict.
Do this by assigning key/value pairs into self.status_dict.
"""
def _get_part_nums(self):
"""Handle fetching of particle numbers."""
live_parts = mwxrun.get_npart()
npart_dict = mwxrun.get_npart_species_dict()
parts_per_species_str = '[{}]'.format(
', '.join(
"{}: {}".format(key, val) for key, val in npart_dict.items()
)
)
return live_parts, parts_per_species_str
[docs] def update_memory(self):
"""Update memory usage information with psutil."""
raise NotImplementedError("Need new iproc implementation to use.")
# # See psutil/scripts/meminfo.py and
# # https://stackoverflow.com/questions/276052/how-to-get-current-cpu-and-ram-usage-in-python
# # for some inspiration.
# if mwxutil.iproc == 0:
# sysmem = "SYSTEM MEMORY USAGE\n-------------------\n"
# sysmem += self._prettyprint_mem(psutil.virtual_memory())
# sysmem += "\nSWAP USAGE\n----------\n"
# sysmem += self._prettyprint_mem(psutil.swap_memory())
# self.status_dict['system_memory'] = sysmem
# else:
# self.status_dict['system_memory'] = ""
# # Gets current process w/ no argument
# proc = psutil.Process()
# procmem = self._prettyprint_mem(proc.memory_info())
# self.status_dict['memory_usage'] = procmem
@staticmethod
def _prettyprint_mem(namedtuple):
"""Helper function for memory usage printing. From
psutil/scripts/meminfo.py.
Returns string.
"""
raise NotImplementedError("Need new iproc implementation to use.")
# memstr = ""
# for name in namedtuple._fields:
# value = getattr(namedtuple, name)
# if name != 'percent':
# value = psutil._common.bytes2human(value)
# memstr += '%-10s : %7s\n' % (name.capitalize(), value)
# return memstr
[docs] def print_performance_summary(self):
total_time = time.time() - self.start_time
total_timesteps = mwxrun.get_it()
steps_per_second = total_timesteps / total_time
steps_per_second_per_proc = steps_per_second / mwxrun.n_procs
particle_steps_per_second = self.particle_steps_total / total_time
particle_steps_per_second_per_proc = (
particle_steps_per_second / mwxrun.n_procs
)
logger.info("### Run Summary ###")
logger.info(f"steps / second : {steps_per_second:.4f}")
logger.info(f"steps / second / proc : {steps_per_second_per_proc:.4f}")
logger.info(f"particle * steps / second : {particle_steps_per_second:.4f}")
logger.info(f"particle * steps / second / proc : {particle_steps_per_second_per_proc:.4f}")