"""
Module for various types of particle emission in WarpX.
"""
import collections
# import collections
import logging
import warnings
import matplotlib.colors as colors
import matplotlib.pyplot as plt
import numba
import numpy as np
import scipy.interpolate
import scipy.stats
import skimage.measure
from mewarpx.mespecies import Species
from mewarpx.mwxrun import mwxrun
from mewarpx.utils_store import appendablearray, parallel_util
import mewarpx.utils_store.mwxconstants as constants
import mewarpx.utils_store.util as mwxutil
from pywarpx import callbacks, picmi
# Get module-level logger
logger = logging.getLogger(__name__)
[docs]class Injector(object):
"""Base class for injection.
All injectors must include an emitter object, and should also include a
'name' field for diagnostics.
"""
emitter = None
# This is overridden if a diagnostic is installed to record injected
# current.
injector_diag = None
# fields is used by the diags.FluxInjectorDiag to know what to write to
# the CSV file. It can be overridden by child classes, but is not currently
# adjustable by the user.
# IF CHANGING THIS, CHANGE IN self.record_injectedparticles() AS WELL.
fields = ['t', 'step', 'species_id', 'V_e', 'n', 'q', 'E_total']
# @staticmethod
# def setup_warp():
# """Stuff that needs to be set before injectors are used."""
# # Update warp derived quantities if needed.
# warp.derivqty()
# # Record E_total
# if 'E_total' not in warp.Species._addedpids:
# warp.Species.addpid('E_total')
[docs] @staticmethod
def compute_npart(npart_total, unique_particles):
"""Compute number of particles to insert at a given timestep.
This function translates between total particle number and this
processor's particle numbers. If particles are designated "unique",
none are discarded by WarpX so we have logic here to give the processor
the right number of particles, with additional logic to load-balance
the remainder. If unique_particles is False, WarpX essentially does the
particle discarding, so each processor should inject the whole number
of particles to start.
Arguments:
npart_total (int): Integer number of total particles to insert this
timestep.
unique_particles (bool): If True, WarpX keeps all particles sent to
it. If False, it only keeps a processor's fraction of total
particles.
Returns:
npart (int): Integer number of total particles for this processor
to insert this timestep.
"""
if not unique_particles:
return npart_total
npart = npart_total // mwxrun.n_procs
# Early-numbered processors add one additional particle if needed.
# Particles get re-distributed between processors after injection, so
# this shouldn't load-imbalance anything.
if mwxrun.me < (npart_total % mwxrun.n_procs):
npart += 1
return npart
[docs] def getvoltage_e(self):
"""Return the electrical voltage of the injector. Defaults to returning
0, unless an emitter is associated with this injector (it should be) in
which case return the emitter's electrical voltage.
Child classes can override this if needed.
"""
if self.emitter is not None:
return self.emitter.getvoltage_e()
return 0.
[docs] def init_injectedparticles(self, fieldlist):
"""Set up the injected particles array. Call before
append_injectedparticles.
Arguments:
fieldlist (list): List of string titles for the fields. Order is
important; it must match the order for future particle appends
that are made.
"""
self._injectedparticles_fields = fieldlist
self._injectedparticles_data = appendablearray.AppendableArray(
typecode='d', unitshape=[len(fieldlist)])
[docs] def record_injectedparticles(self, species, w=0, E_total=None,
n=None):
"""Handles transforming raw particle information to the information
used to record particles as a function of time. Also handles parallel
sum and appending to the data array the current amount of injection.
Note:
Assumes the fixed form of fields given in Injector(). Doesn't
check since this is called many times.
Since a parallelsum is performed on ``_injectedparticles_data``
when calling ``get_injectedparticles``, this function has to be
called by all processors. Call this function with only the species
argument if no particles are being added by this processor.
Arguments:
species (:class:`mewarpx.mespecies.Species`): Species of particle
w (np.ndarray or float): Array of length npart with particle weights
E_total (np.ndarray or float): Array of length npart with E_total
values.
n (int): Number of macroparticles, _only_ needed if overriding the
length of E_total. This is useful mostly in the case that
E_total is already summed over particles, in which case a
single number can be passed for it rather than an array.
"""
if n is not None and np.size(w) != 1:
raise RuntimeError("Cannot pass array for w and specify n")
if n is None and np.size(w) == 1:
raise RuntimeError(
"Cannot pass single value for w and not specify n"
)
data = np.zeros(7)
# time for current step
data[0] = mwxrun.get_it() * mwxrun.get_dt()
# current step
data[1] = mwxrun.get_it()
# species ID
data[2] = species.species_number
# voltage of emitter
data[3] = self.getvoltage_e()
# number of macroparticles
data[4] = n if np.size(w) == 1 else np.size(w)
# total charge emitted
data[5] = species.sq * np.sum(w)
if E_total is not None:
data[6] = np.sum(E_total)
self.append_injectedparticles(data)
[docs] def append_injectedparticles(self, data):
"""Append one or more lines of injected particles data.
Arguments:
data (np.ndarray): Array of shape (m) or (n, m) where m is the
number of fields and n is the number of rows of data to append.
"""
self._injectedparticles_data.append(data)
[docs] def get_injectedparticles(self, clear=False):
"""Retrieve a copy of injectedparticles data.
Arguments:
clear (bool): If True, clear the particle data rows entered (field
names are still initialized as before). Default False.
Returns:
injectedparticles_dict (collections.OrderedDict): Keys are the
originally passed field strings for lost particles. Values are
an (n)-shape numpy array for each field.
"""
lpdata = self._injectedparticles_data.data()
# Sum all except t/step/species_id/V_e from all processors
lpdata[:, 4:] = parallel_util.parallelsum(np.array(lpdata[:, 4:]))
lpdict = collections.OrderedDict(
[(fieldname, np.array(lpdata[:, ii], copy=True))
for ii, fieldname in enumerate(self._injectedparticles_fields)])
if clear:
self._injectedparticles_data.cleardata()
return lpdict
[docs]class FixedNumberInjector(Injector):
"""Inject n particles every t timesteps."""
def __init__(self, emitter, species, npart,
injectfreq=None, injectoffset=1,
weight=0., rseed=None,
name=None, unique_particles=True):
"""Sets up user-specified injection with fixed timestep and weights.
Arguments:
emitter (:class:`mewarpx.emission.BaseEmitter`): Emitter object that
will specify positions and velocities of particles to inject.
species (picmi.Species): Premade species to inject particles of.
npart (int): Number of particles to inject total
injectfreq (int): Number of steps to wait for next injection.
Default infinity.
injectoffset (int): First timestep to inject. Default 1 (the
first possible timestep in WarpX).
weight (float): Macroparticle weight to be introduced.
rseed (int): If specified, all injection should be repeatable using
this rseed. At present each set of injected particles will have
the same initial position and velocities as the previous set.
name (str): Injector name for diagnostics. Constructed from
speciesname if not given.
unique_particles (bool): Whether WarpX will keep all particles
given it from every processor (True) or keep only a fraction of
particles based on processor count (False).
"""
# Save class parameters
self.emitter = emitter
self.species = species
self.npart_total = npart
self.injectfreq = injectfreq
if self.injectfreq is None:
self.injectfreq = np.inf
self.injectoffset = injectoffset
self.weight = weight
self.rseed = rseed
self.name = name
if self.name is None:
self.name = "fixed_injector_" + self.species.name
self.unique_particles = unique_particles
logger.info(
f"Fixed injection of {self.npart_total} particles, "
f"weight {self.weight}, every {self.injectfreq}"
f"timesteps."
)
callbacks.installparticleinjection(self.inject_particles)
# add E_total PID to this species
self.species.add_pid("E_total")
[docs] def inject_particles(self):
"""Perform the actual injection!"""
effective_it = mwxrun.get_it() - self.injectoffset
if effective_it >= 0 and effective_it % self.injectfreq == 0:
# Adjust npart for processor number if needed
npart = self.compute_npart(
npart_total=self.npart_total,
unique_particles=self.unique_particles
)
# TODO randomdt and velhalfstep are False simply because they're
# not supported at present
particles_dict = self.emitter.get_newparticles(
npart=npart, w=self.weight,
q=self.species.sq, m=self.species.sm,
rseed=self.rseed,
randomdt=False, velhalfstep=False
)
logger.info(f"Inject {len(particles_dict['x'])} particles")
# Note some parts of WarpX call the variables ux and some parts vx,
# and they're referred to as momenta. But I don't see anywhere
# they're actually used as momenta including the particle mass -
# the actual update is in Source/Particles/Pusher/UpdatePosition.H
mwxrun.sim_ext.add_particles(
self.species.name,
x=particles_dict['x'],
y=particles_dict['y'],
z=particles_dict['z'],
ux=particles_dict['vx'],
uy=particles_dict['vy'],
uz=particles_dict['vz'],
w=particles_dict['w'],
E_total=particles_dict['E_total'],
unique_particles=self.unique_particles
)
if self.injector_diag is not None:
self.record_injectedparticles(
species=self.species,
w=particles_dict['w'],
E_total=particles_dict['E_total'],
)
[docs]class ThermionicInjector(Injector):
"""Performs standard every-timestep injection from a thermionic cathode."""
def __init__(self, emitter, species, npart_per_cellstep, T=None,
WF=None, A=constants.A0*1e4, use_Schottky=True,
Schottky_factor=1.0,
allow_poisson=False, wfac=1.0,
name=None, profile_decorator=None,
unique_particles=True):
"""Sets up user-specified injection for warpX.
Arguments:
emitter (:class:`mewarpx.emission.Emitter`): Emitter object that
will specify positions and velocities of particles to inject.
species (mewarpx.mespecies.Species): A premade species. Note only
electrons will actually give physically meaningful weight
calculations.
npart_per_cellstep (int): Number of macroparticles to inject per
cell on the cathode surface per timestep
T (float): Cathode temperature (K). Uses emitter T if not specified.
WF (float): Cathode work function (eV). Uses WF of the conductor
associated with the emitter if not specified.
A (float): Coefficient of emission in Amp/m^2/K^2. Default is
the theoretical max, approximately 1.2e6.
use_Schottky (bool): Flag specifying whether or not to augment the
emission current via field-dependent particle weights.
Defaults to True.
Schottky_factor (float): A prefactor on Schottky enhancement to
simulate "anomalous Schottky enhancement" if greater than 1.
Default 1.0.
allow_poisson (bool): If True and < npart_per_cellstep electrons
would be injected per cell, inject whole electrons with a
Poisson distribution. If False, inject fractions of electrons.
Default False.
wfac (float): Constant factor applied to variable particle
weights, which changes the actual injection weight from the
physically calculated quantity. Currently used only for
testing, or for e.g. artificially lowering weight of trace
particles.
name (str or None): Injector name for diagnostics. Constructed from
speciesname if not given.
profile_decorator (decorator): A decorator used to profile the
injection methods and related functions.
unique_particles (bool): Whether WarpX will keep all particles
given it from every processor (True) or keep only a fraction of
particles based on processor count (False). Default True.
"""
# sanity check species
if species.particle_type != 'electron':
raise AttributeError(
"Thermionic emission is only applicable with electrons as the "
f"injection species, but species type {species.particle_type} "
"was given."
)
# Save class parameters
self.emitter = emitter
self.species = species
self.T = T
self.WF = WF
self.A = A
self.use_Schottky = use_Schottky
self.Schottky_factor = Schottky_factor
self.wfac = wfac
# Get values from the emitter and its conductor if not specified
if self.T is None:
self.T = self.emitter.T
if self.WF is None:
self.WF = self.emitter.conductor.WF
if profile_decorator is not None:
self.inject_particles = profile_decorator(self.inject_particles)
self.record_injectedparticles = (
profile_decorator(self.record_injectedparticles)
)
self.name = name
if self.name is None:
self.name = "thermionic_injector_" + self.species.name
self.unique_particles = unique_particles
area = self.emitter.area
dt = mwxrun.get_dt()
if (area is None) or (area <= 0.0) or (dt <= 0.0):
raise ValueError(f"area {area} or dt {dt}"
f" is invalid for injection.")
# Determine weight and injection numbers
electrons_per_step = (mwxutil.J_RD(self.T, self.WF, self.A)
* area * dt / picmi.constants.q_e)
logger.info(
f"Setting up thermionic particle injection. Area {area:.3g} m^2, "
f"dt {dt:.3e} s, J {mwxutil.J_RD(self.T, self.WF, self.A):.3g} "
"A/m^2. Schottky enhancement is "
f"{'on' if self.use_Schottky else 'off'}."
)
if self.use_Schottky and (self.Schottky_factor != 1.0):
logger.info(
"Schottky enhancement multiplied by factor "
f"{self.Schottky_factor:.3g}."
)
logger.info(
"Emission current corresponds to injection of "
f"{electrons_per_step:.2e} electrons per timestep"
)
max_injections = int(round(npart_per_cellstep *
self.emitter.cell_count))
# If it was requested to inject more particles than we have electrons,
# we instead inject electrons with a poisson distribution if allowed.
if electrons_per_step < max_injections and allow_poisson:
self.ptcl_per_step = electrons_per_step
self.weight = self.wfac
self.poisson = True
logger.info(
"Using stochastic injection of electrons with "
"Poisson sampling"
)
else:
self.ptcl_per_step = max_injections
self.weight = self.wfac * electrons_per_step / self.ptcl_per_step
self.poisson = False
logger.info(
f"Using deterministic injection of {self.ptcl_per_step} "
f"particles per step, each with weight {self.weight}"
)
# create new species that will be used to properly distribute new
# particles and retrieve the electric field at their injection sites in
# order to calculate Schottky enhancement
if self.use_Schottky:
self.injection_species = Species(
particle_type='electron', name=self.species.name+'_injection'
)
else:
self.injection_species = self.species
callbacks.installparticleinjection(self.inject_particles)
# add E_total PID to this species
self.species.add_pid("E_total")
self.injection_species.add_pid("E_total")
if self.use_Schottky:
# add PIDs to hold the normal vector
# TODO work out a better way to handle these PIDs since this is not
# a great use of memory
self.species.add_pid("norm_x")
self.species.add_pid("norm_y")
self.species.add_pid("norm_z")
self.injection_species.add_pid("norm_x")
self.injection_species.add_pid("norm_y")
self.injection_species.add_pid("norm_z")
[docs] def inject_particles(self):
"""Perform the actual injection!"""
if self.poisson:
num_injections = np.random.poisson(self.ptcl_per_step)
else:
num_injections = self.ptcl_per_step
# Adjust npart for processor number if needed
npart = self.compute_npart(
npart_total=num_injections,
unique_particles=self.unique_particles
)
# TODO randomdt and velhalfstep are False simply because they're
# not supported at present
particles_dict = self.emitter.get_newparticles(
npart=npart, w=self.weight, q=self.species.sq, m=self.species.sm,
randomdt=False, velhalfstep=False
)
extra_pids = {}
extra_pids['E_total'] = particles_dict['E_total']
extra_pids['w'] = particles_dict['w']
if self.use_Schottky:
# Determine the local surface normal for each particle
normal_vectors = self.emitter.get_normals(
particles_dict['x'], particles_dict['y'], particles_dict['z']
)
extra_pids['norm_x'] = normal_vectors[:, 0]
extra_pids['norm_y'] = normal_vectors[:, 1]
extra_pids['norm_z'] = normal_vectors[:, 2]
# Note some parts of WarpX call the variables ux and some parts vx,
# and they're referred to as momenta. But I don't see anywhere
# they're actually used as momenta including the particle mass -
# the actual update is in Source/Particles/Pusher/UpdatePosition.H
mwxrun.sim_ext.add_particles(
self.injection_species.name,
x=particles_dict['x'],
y=particles_dict['y'],
z=particles_dict['z'],
ux=particles_dict['vx'],
uy=particles_dict['vy'],
uz=particles_dict['vz'],
unique_particles=self.unique_particles,
**extra_pids
)
if self.use_Schottky:
# Up-weight the particles by the local Schottky factor, calculated
# as exp[sqrt(e / 4*pi*eps0) / (kT) * sqrt(max(-E, 0))]
pre_fac = self.Schottky_factor * (
np.sqrt(constants.e / (4.0 * np.pi * constants.epsilon_0))
/ (constants.kb_eV * self.emitter.T)
)
mwxrun.calc_Schottky_weight(
self.injection_species.name, pre_fac
)
# get the total injected weight and energy
total_weight = 0.
total_energy = 0.
npart = 0
weight_arrays = mwxrun.sim_ext.get_particle_arrays(
self.injection_species.name, 'w', 0
)
ux_arrays = mwxrun.sim_ext.get_particle_arrays(
self.injection_species.name, 'ux', 0
)
uy_arrays = mwxrun.sim_ext.get_particle_arrays(
self.injection_species.name, 'uy', 0
)
uz_arrays = mwxrun.sim_ext.get_particle_arrays(
self.injection_species.name, 'uz', 0
)
for ii, w in enumerate(weight_arrays):
npart += len(w)
total_weight += np.sum(w)
total_energy += np.sum(self.emitter._get_E_total(
ux_arrays[ii], uy_arrays[ii], uz_arrays[ii],
constants.e, constants.m_e, w
))
# Move particles from temporary container to "real" container
mwxrun.move_particles_between_species(
self.injection_species.name, self.species.name
)
else:
total_weight = np.sum(particles_dict['w'])
total_energy = np.sum(particles_dict['E_total'])
if self.injector_diag is not None:
self.record_injectedparticles(
species=self.species,
w=total_weight,
E_total=total_energy,
n=npart
)
[docs]class PlasmaInjector(Injector):
"""Inject particles at simulation start, or at regular timesteps, to
seed a plasma. Can use any emitter object. The defining feature is that the
2nd species positions and weights are copied from the first species, so the
spatial distribution is always identical to start. Velocities are
independent, however.
"""
def __init__(self, emitter, species1, species2, npart, T_2=None,
plasma_density=None, ionization_frac=None,
P_neutral=None, T_neutral=None,
injectfreq=None, injectoffset=1,
rseed=None, name=None, unique_particles=True
):
"""Initialize injection of a plasma with two species and given emitter.
Arguments:
emitter (:class:`mewarpx.emission.BaseEmitter`): BaseEmitter object
that will specify positions and velocities of particles to
inject.
species1 (:class:`mewarpx.mespecies.Species`): First species, eg
electron
species2 (:class:`mewarpx.mespecies.Species`): Second species, eg ion
npart (int): Number of macroparticles to inject total among all
processors and species.
T_2 (float): If specified, species2 will be injected at this
temperature.
plasma_density (float): Ion number density to inject. If using
volumetric emitter, in m^(-3), if using surface emitter, in
m^(-2)
ionization_frac (float): Instead of plasma_density, use a specific
ionization fraction of the neutral gas. Volumetric emitter
only.
P_neutral (float): If using ionization_frac only, the neutral gas
density (*Torr*).
T_neutral (float): If using ionization_frac only, the neutral gas
temperature (K).
injectfreq (int): Number of steps to wait for next injection.
Default infinity.
injectoffset (int): First timestep to inject. Default 1 (the first
possible timestep in WarpX).
rseed (int): If specified, all injection should be repeatable using
this rseed. At present each set of injected particles will have
the same initial position and velocities as the previous set.
name (str or None): Injector name for diagnostics. Constructed from
species names if not given.
unique_particles (bool): Whether WarpX will keep all particles
given it from every processor (True) or keep only a fraction of
particles based on processor count (False). Default True.
"""
# Save class parameters
self.emitter = emitter
self.npart_per_species = npart // 2
self.species1 = species1
self.species2 = species2
self.T_2 = T_2
if injectfreq is None:
injectfreq = np.inf
self.injectfreq = injectfreq
self.injectoffset = injectoffset
self.rseed = rseed
# Don't calculate plasma density if seeding with arbitrary plasma density
if isinstance(self.emitter, ArbitraryDistributionVolumeEmitter):
self.plasma_density = None
else:
self._calc_plasma_density(
plasma_density=plasma_density,
ionization_frac=ionization_frac,
P_neutral=P_neutral,
T_neutral=T_neutral,
)
self.name = name
if self.name is None:
self.name = (
f"plasma_injector_{self.species1.name}_{self.species2.name}"
)
self.unique_particles = unique_particles
logger.info(
f"Plasma injection {self.name}: "
f"{self.npart_per_species} particles each of {self.species1.name} "
f"and {self.species2.name}, every {self.injectfreq} timesteps,"
)
# Surface emission
if isinstance(self.emitter, Emitter):
self.weight = (
self.emitter.area * self.plasma_density / self.npart_per_species
)
warnings.warn(
"Using a surface emitter with the PlasmaInjector has not been "
"tested for accuracy."
)
logger.info(
f" full weight {self.weight:.4g}, surface density "
f"{self.plasma_density:.4g} m^-2, area "
f"{self.emitter.area:.4g} m^2."
)
# give placeholder weight which will be overwritten for
# ArbitraryDistributionVolumeEmitter
elif isinstance(self.emitter, ArbitraryDistributionVolumeEmitter):
self.weight = 0
# Volume emission
else:
self.weight = (
self.emitter.volume * self.plasma_density
/ self.npart_per_species
)
logger.info(
f" full weight {self.weight:.4g}, volume density "
f"{self.plasma_density:.4g} m^-3, volume "
f"{self.emitter.volume:.4g} m^3."
)
debye_length = mwxutil.plasma_Debye_length(
self.emitter.T, self.plasma_density)
logger.info(
f" Corresponding plasma Debye length is {debye_length:.3e} m."
)
callbacks.installparticleinjection(self.inject_particles)
# add E_total PID to the species involved
self.species1.add_pid("E_total")
self.species2.add_pid("E_total")
def _calc_plasma_density(self, plasma_density, ionization_frac, P_neutral,
T_neutral):
"""Helper function to separate out part of initialization."""
self.plasma_density = plasma_density
if ionization_frac is not None:
if self.plasma_density is not None:
raise ValueError(
"Specify ionization_frac or plasma_density, not both.")
if (
(P_neutral is None) or (P_neutral <= 0) or
(T_neutral is None) or (T_neutral <= 0)
):
raise ValueError("Must specify positive neutral pressure and "
"temperature to use ionization_frac.")
if isinstance(self.emitter, Emitter):
raise RuntimeError("Cannot use ionization_frac with a surface"
" (area-based) Emitter.")
n_neutral = mwxutil.ideal_gas_density(P_neutral, T_neutral)
self.plasma_density = n_neutral * ionization_frac
if (self.plasma_density is None) or (self.plasma_density <= 0):
raise ValueError("Invalid plasma_density {}".format(
self.plasma_density))
[docs] def inject_particles(self):
"""Inject particles, same position & weight for each."""
effective_it = mwxrun.get_it() - self.injectoffset
if effective_it >= 0 and effective_it % self.injectfreq == 0:
# Adjust npart for processor number if needed
npart = self.compute_npart(
npart_total=self.npart_per_species,
unique_particles=self.unique_particles
)
# TODO randomdt and velhalfstep are False simply because they're
# not supported at present
particles1_dict = self.emitter.get_newparticles(
npart=npart, w=self.weight, q=self.species1.sq,
m=self.species1.sm, rseed=self.rseed,
randomdt=False, velhalfstep=False
)
# if requested get particles for species2 at the specified
# temperature
if self.T_2 is not None:
T_temp, self.emitter.T = self.emitter.T, self.T_2
# TODO randomdt and velhalfstep are False simply because they're
# not supported at present
particles2_dict = self.emitter.get_newparticles(
npart=npart, w=self.weight, q=self.species2.sq,
m=self.species2.sm, rseed=self.rseed,
randomdt=False, velhalfstep=False
)
if self.T_2 is not None:
self.emitter.T = T_temp
for key in ['x', 'y', 'z', 'w']:
particles2_dict[key] = particles1_dict[key]
logger.info(
f"Inject {len(particles1_dict['x'])} particles each of "
f"{self.species1.name} and {self.species2.name}."
)
mwxrun.sim_ext.add_particles(
self.species1.name,
x=particles1_dict['x'],
y=particles1_dict['y'],
z=particles1_dict['z'],
ux=particles1_dict['vx'],
uy=particles1_dict['vy'],
uz=particles1_dict['vz'],
w=particles1_dict['w'],
E_total=particles1_dict['E_total'],
unique_particles=self.unique_particles
)
mwxrun.sim_ext.add_particles(
self.species2.name,
x=particles2_dict['x'],
y=particles2_dict['y'],
z=particles2_dict['z'],
ux=particles2_dict['vx'],
uy=particles2_dict['vy'],
uz=particles2_dict['vz'],
w=particles2_dict['w'],
E_total=particles2_dict['E_total'],
unique_particles=self.unique_particles
)
if self.injector_diag is not None:
self.record_injectedparticles(
species=self.species1,
w=particles1_dict['w'],
E_total=particles1_dict['E_total'],
)
self.record_injectedparticles(
species=self.species2,
w=particles2_dict['w'],
E_total=particles2_dict['E_total'],
)
[docs]class BaseEmitter(object):
"""Parent class of both Emitter (which handles injection from a surface or
other area) and VolumeEmitter (which handles injection throughout a
volume).
All BaseEmitter objects are expected to contain:
- ``get_newparticles()`` returns coordinates, velocities, and KE in a
dict - implemented here
- ``_get_xv_coords()`` implements the subclass-specific particle
injection logic
- ``getvoltage()`` calculates the potential energy for particle
energies.
- ``getvoltage_e()`` calculates the potential energy for particle
energies including the work function.
- ``geoms`` is a property containing a list of simulation geometries
supported by the Emitter, as strings
"""
# Stores a list of functions that are used to adjust variable particle
# weights.
_wfnlist = None
# Needs to be overridden to specify acceptable geometries
geoms = []
def __init__(self):
"""Check geometry and any other universal initialization.
"""
self.solver_geom = self.check_geom()
# # Use to get E and phi as needed.
# self.particle_helper = ParticleValHelper()
[docs] def check_geom(self):
"""Return the current solver geometry, or throw an error if it is
unsupported by the Emitter.
"""
geom = mwxrun.geom_str
if geom not in self.geoms:
raise ValueError(
f"{geom} geometry not supported by this Emitter")
return geom
[docs] def getvoltage(self):
"""This should return the potential energy at the injection site for
fully accurate energetics.
"""
raise NotImplementedError
[docs] def getvoltage_e(self):
"""This should return the potential energy, including work function,
at the injection site for fully accurate energetics.
"""
raise NotImplementedError
@staticmethod
def _gen_particle_dict(x, y, z, vx, vy, vz, w, **kwargs):
"""Change standard arrays into format expected by an injector.
The transfer to an injector uses a dict so that optional
arguments can be passed, or additional arguments added.
Arguments:
x (np.ndarray): n-shape position array
y (np.ndarray): n-shape position array
z (np.ndarray): n-shape position array
vx (np.ndarray): n-shape velocity array
vy (np.ndarray): n-shape velocity array
vz (np.ndarray): n-shape velocity array
w (float or np.ndarray): Particle weight, either constant or
per-particle.
kwargs (np.ndarray): These are simply copied into the dictionary
"""
particle_dict = {
'x': x, 'y': y, 'z': z,
'vx': vx, 'vy': vy, 'vz': vz,
'w': np.ones_like(x) * w
}
particle_dict.update(kwargs)
return particle_dict
def _get_E_total(self, vx, vy, vz, q, m, w):
"""Calculate initial particle energies.
Note:
The conductor voltage V of the conductor the particle is ejected
from must also be set for this object.
Arguments:
vx (np.ndarray): n-length array of velocity x-components
vy (np.ndarray): n-length array of velocity y-components
vz (np.ndarray): n-length array of velocity z-components
q (float): Charge of the particles, usually species.sq.
m (float): Mass of the particles, usually species.sm.
w (np.ndarray): Variable particle weight, n-shape
"""
V = self.getvoltage()
E_total = w*(0.5*m*(vx**2 + vy**2 + vz**2) + q*V)
return E_total
[docs] def get_newparticles(self, npart, w, q, m, rseed=None,
randomdt=True, velhalfstep=True):
"""Return dict with coordinates, velocities, and KE
Note:
This function SHOULD (but doesn't in WarpX yet) handle the random
timestep advancement and the negative half-step velocity push. They
can be turned off if desired. No leapfrogging is done in the
initial random advancement, which could be a (hopefully very minor)
source of error.
Arguments:
npart (int): Total number of particles to inject
w (float): Weight of the particles
q (float): Charge of the particles, usually species.sq.
m (float): Mass of the particles, usually species.sm. Equals
electron mass if not otherwise specified.
rseed (int): Random seed, if specified, can be used to provide
reproducible results. Typically used for test / not production
runs.
randomdt (bool): If True, move each particle ahead a random delta t
in [0, dt), advancing both position and velocity together.
Default True.
velhalfstep (bool): If True, push the velocities a negative
half-step using the E-field. Aligns position and velocities
correctly for the leapfrog algorithm.
Returns:
particle_dict (dict): Contains lists, each with length equal to the
number of particles:
- ``x``, ``y``, and ``z`` contain initial positions
- ``vx``, ``vy``, and ``vz`` contain initial velocities
- ``E_total`` contains initial energy of each particle, kinetic
& potential.
- ``w`` contains particle weights.
"""
if rseed is not None:
nprstate = np.random.get_state()
np.random.seed(rseed)
rseedxv = np.random.randint(1000000000)
rseedt = np.random.randint(1000000000)
else:
rseedxv = None
rseedt = None
x, y, z, vx, vy, vz = self._get_xv_coords(
npart=npart, m=m, rseed=rseedxv
)
particle_dict = self._gen_particle_dict(
x=x, y=y, z=z, vx=vx, vy=vy, vz=vz, w=w
)
if self._wfnlist is not None:
for wfn in self._wfnlist:
particle_dict['w'] = wfn(particle_dict)
particle_dict['E_total'] = self._get_E_total(
vx=particle_dict['vx'],
vy=particle_dict['vy'],
vz=particle_dict['vz'],
q=q, m=m, w=particle_dict['w']
)
# After E_total has been computed, we advance particles as needed.
if randomdt:
self.particle_helper.advance_random_deltat(
particle_dict['x'], particle_dict['y'], particle_dict['z'],
particle_dict['vx'], particle_dict['vy'], particle_dict['vz'],
q=q, m=m, rseed=rseedt
)
if velhalfstep:
self.particle_helper.push_v_minus_halfstep(
particle_dict['x'], particle_dict['y'], particle_dict['z'],
particle_dict['vx'], particle_dict['vy'], particle_dict['vz'],
q=q, m=m
)
if rseed is not None:
np.random.set_state(nprstate)
return particle_dict
def _update_params(self):
"""Update local parameters if needed based on WarpX settings.
By default does nothing, but subclasses can implement it to update
parameters before new particle coordinates are generated.
"""
pass
def _get_xv_coords(self, npart, m, rseed):
"""Per-subclass implementation of generating new particle data.
See :func:`mewarpx.emission.BaseEmitter.get_newparticles` for details on
arguments.
Returns:
x, y, z, vx, vy, vz (np.array): Each must be a 1D numpy array.
"""
raise NotImplementedError(
"BaseEmitter subclasses must implement _get_xv_coords")
[docs] def add_wfn(self, wfn):
"""Add a variable weight function to the emitter.
Arguments:
wfn (function): This must take in a particle dictionary with
positions, velocities, and existing weights, and return a new
array of particle weights.
"""
if self._wfnlist is None:
self._wfnlist = []
self._wfnlist.append(wfn)
[docs]class Emitter(BaseEmitter):
"""Parent class for emission from a surface.
All Emitter objects are expected to contain:
- ``area`` is a property containing the area in m^2
- ``cell_count`` is a property containing the number of mesh cells
spanned by the Emitter
- ``geoms`` is a property containing a list of simulation geometries
supported by the Emitter
- ``_get_xv_coords()`` implements the subclass-specific particle
injection logic
- ``get_normals()`` returns the normals for a set of particle
coordinates.
"""
area = None
cell_count = None
geoms = []
def __init__(self, T, conductor=None, emission_type='thermionic'):
"""Default initialization for all Emitter objects.
Arguments:
T (float): Emitter temperature in Kelvin. Determines particle
velocity distribution. If None, the temperature of the
conductor will be used if one is specified.
conductor (assemblies.Assembly): Conductor the emitter is attached
to, used for recording initial voltages and energies. If None,
V_e is set to 0. Since there's no current use case for this, a
warning is printed.
emission_type (str): Distribution function type used to sample
velocities of the emitted particles. Must be defined in
:func:`mewarpx.utils_store.util.get_velocities`. Defaults to
'thermionic'.
"""
super(Emitter, self).__init__()
self.T = T
if self.T is None and conductor is not None:
self.T = conductor.T
if self.T is None:
raise ValueError(
"No value for T given to the Emitter. An Emitter T must be "
"specified directly, or on a conductor passed to the Emitter."
)
self.conductor = conductor
if self.conductor is not None:
if self.conductor.WF <= 0.:
raise ValueError("Conductor WF must be set for emitters.")
else:
warnings.warn("No conductor set for emitter. Power will not be "
"correct.")
self.emission_type = emission_type
[docs] def getvoltage(self):
if self.conductor is None:
return 0.
return self.conductor.getvoltage()
[docs] def getvoltage_e(self):
"""Electrical voltage includes WF, eg the Fermi level voltage."""
if self.conductor is None:
return 0.
return self.conductor.getvoltage() + self.conductor.WF
[docs] def get_normals(self, x, y, z):
"""Calculate local surface normal at specified coordinates.
Arguments:
x (np.ndarray): x-coordinates of emitted particles (in meters).
y (np.ndarray): y-coordinates of emitted particles (in meters).
z (np.ndarray): z-coordinates of emitted particles (in meters).
Returns:
normals (np.ndarray): nx3 array containing the outward surface
normal vector at each particle location.
"""
raise NotImplementedError("Normal calculations must be implemented by "
"Emitter sub-classes.")
[docs]class ZPlaneEmitter(Emitter):
"""This is the standard injection for a planar cathode."""
geoms = ['Z', 'XZ', 'XYZ']
def __init__(self, conductor, T=None, xmin=None, xmax=None,
ymin=None, ymax=None, transverse_fac=1.0, **kwargs):
"""Initialize an emitter for a planar cathode.
Arguments:
conductor (:class:`mewarpx.assemblies.Assembly`): Conductor object,
used to obtain work function and z coordinate/direction.
T (float): Temperature in Kelvin for the emitter; determines
velocities. If not specified the temperature of the conductor
will be used.
xmin (float): Minimum position of the rectangular emitter along x.
Default mwxrun.xmin.
xmax (float): Maximum position of the rectangular emitter along x.
Default mwxrun.xmax.
ymin (float): Minimum position of the rectangular emitter along y.
Default mwxrun.ymin.
ymax (float): Maximum position of the rectangular emitter along y.
Default mwxrun.ymax.
transverse_fac (float): Scale the transverse energy distribution by
this factor. Default 1. See
:func:`mewarpx.utils_store.util.get_velocities` for details.
kwargs (dict): Any other keyword arguments supported by the parent
Emitter constructor (such as "emission_type").
"""
# Default initialization
super(ZPlaneEmitter, self).__init__(T=T, conductor=conductor, **kwargs)
self.z = conductor.z
self.zsign = conductor.zsign
self.transverse_fac = transverse_fac
# Determine bounds
# Will be 4 element array [xmin, xmax, ymin, ymax]
self.bounds = []
for coord, default in [(xmin, mwxrun.xmin),
(xmax, mwxrun.xmax),
(ymin, mwxrun.ymin),
(ymax, mwxrun.ymax)]:
self.bounds.append(coord if coord is not None else default)
# Compute area
x_range = self.bounds[1] - self.bounds[0]
y_range = self.bounds[3] - self.bounds[2]
if self.solver_geom == 'Z':
logger.info("x/y span is 1m for purposes of charge injection")
x_range = 1.
y_range = 1.
if self.solver_geom == 'XZ':
logger.info("y span is 1m for purposes of charge injection")
y_range = 1.
self.area = x_range * y_range
# Compute cell count
if self.solver_geom == 'Z':
self.cell_count = 1
elif self.solver_geom == 'XZ':
self.cell_count = self.area / mwxrun.dx
else:
self.cell_count = self.area / (mwxrun.dx * mwxrun.dy)
def _get_xv_coords(self, npart, m, rseed):
"""Get particle coordinates given particle number.
See :func:`mewarpx.emission.BaseEmitter.get_newparticles` for details.
"""
if rseed is not None:
nprstate = np.random.get_state()
np.random.seed(rseed)
rseedv = np.random.randint(1000000000)
rseedx = np.random.randint(1000000000)
else:
rseedv = None
rseedx = None
vx, vy, vz = mwxutil.get_velocities(
npart, self.T, m=m, transverse_fac=self.transverse_fac,
emission_type=self.emission_type, rseed=rseedv)
x, y, z = mwxutil.get_positions(
npart, xmin=self.bounds[0], xmax=self.bounds[1],
ymin=self.bounds[2], ymax=self.bounds[3], z=self.z,
rseed=rseedx)
# Flip z velocities for anode emission. This appears to be faster than
# an if statement for 10000 or fewer particles.
vz = -self.zsign * vz
if rseed is not None:
np.random.set_state(nprstate)
return x, y, z, vx, vy, vz
[docs] def get_normals(self, x, y, z):
"""Calculate local surface normal at specified coordinates.
Arguments:
x (np.ndarray): x-coordinates of emitted particles (in meters).
y (np.ndarray): y-coordinates of emitted particles (in meters).
z (np.ndarray): z-coordinates of emitted particles (in meters).
Returns:
normals (np.ndarray): nx3 array containing the outward surface
normal vector at each particle location.
"""
normals = np.zeros((len(x), 3))
normals[:, 2] = -self.zsign
return normals
[docs]class ZPlanePatchyEmitter(object):
"""Injection for a patchy cathode."""
geoms = ['XZ']
def __init__(self, patchy_cathode, transverse_fac=1.0, **kwargs):
"""Initialize an emitter for a planar patchy cathode.
Arguments:
patchy_cathode (:class:`mewarpx.assemblies.PatchyCathode`): Patchy
cathode object, used to obtain work functions, patch size and z
coordinate/direction.
transverse_fac (float): Scale the transverse energy distribution by
this factor. Default 1. See
:func:`mewarpx.utils_store.util.get_velocities` for details.
kwargs (dict): Any other keyword arguments supported by the
Emitter constructor (such as "emission_type").
"""
self.patchy_cathode = patchy_cathode
# The tricky part here is that we need to calculate the appropriate
# size for each patch set knowing that the domain is not necessarily
# an even integer multiple of the patch size. Note that the
# PatchyCathode is hard coded to have a high WF patch start at x=0.
patch_num = abs(mwxrun.xmax) / self.patchy_cathode.patch_size
full_patches = np.floor(patch_num)
if full_patches % 2.0:
high_wf_patches = (full_patches + 1.0) / 2.0
else:
high_wf_patches = full_patches / 2.0 + (patch_num - full_patches)
patch_num = abs(mwxrun.xmin) / self.patchy_cathode.patch_size
full_patches = np.floor(patch_num)
if full_patches % 2.0:
high_wf_patches += (
(full_patches - 1.0) / 2.0 + (patch_num - full_patches)
)
else:
high_wf_patches += full_patches / 2.0
high_wf_length = high_wf_patches * self.patchy_cathode.patch_size
low_wf_length = mwxrun.xmax - mwxrun.xmin - high_wf_length
# Default initialization for the two sets of patches
self.high_wf = ZPlanePatchSet(
conductor=self.patchy_cathode.cathode_list[0],
xmin=-high_wf_length/2.0, xmax=high_wf_length/2.0, **kwargs
)
self.low_wf = ZPlanePatchSet(
conductor=self.patchy_cathode.cathode_list[1],
xmin=-low_wf_length/2.0, xmax=low_wf_length/2.0, **kwargs
)
[docs]class ZPlanePatchSet(ZPlaneEmitter):
"""Emitter for a set of single WF patches. This is meant to be used by the
``ZPlanePatchyEmitter``, not directly."""
def _get_xv_coords(self, npart, m, rseed):
"""Get particle coordinates given particle number.
See parent class function for details.
"""
x, y, z, vx, vy, vz = super()._get_xv_coords(npart, m, rseed)
# move x to be centered on conductor.x_start (assumes
# ``ZPlanePatchyEmitter`` set the emitter up centered at x=0)
x += self.conductor.x_start
point = self.conductor.x_start + self.conductor.patch_size
while point < mwxrun.xmax:
x[np.where(x > point)] += self.conductor.patch_size
point += 2.0*self.conductor.patch_size
point = self.conductor.x_start
while point > mwxrun.xmin:
x[np.where(x < point)] -= self.conductor.patch_size
point -= 2.0*self.conductor.patch_size
return x, y, z, vx, vy, vz
[docs]class XPlaneEmitter(Emitter):
"""Injection for a planar cathode emitting from the simulation side."""
geoms = ['XZ', 'XYZ']
def __init__(self, conductor, T=None, x=None, ymin=None, ymax=None,
zmin=None, zmax=None, transverse_fac=1.0, xdir=1, **kwargs):
"""Initialize an emitter for a planar cathode.
Arguments:
conductor (:class:`mewarpx.assemblies.Assembly`): Conductor object,
used to obtain work function and z coordinate/direction.
T (float): Temperature in Kelvin for the emitter; determines
velocities. If not specified the temperature of the conductor
will be used.
x (float): Position of the emitter along the x axis. Default
conductor.x if it exists otherwise conductor.xmin/max depending
on xdir. If none of those attributes exist an error will be
raised if x is not specified.
ymin (float): Minimum position of the rectangular emitter along y.
Default conductor.ymin if it exists otherwise mwxrun.ymin.
ymax (float): Maximum position of the rectangular emitter along y.
Default conductor.ymax if it exists otherwise mwxrun.ymax.
zmin (float): Minimum position of the rectangular emitter along z.
Default conductor.zmin if it exists otherwise mwxrun.zmin.
zmax (float): Maximum position of the rectangular emitter along z.
Default conductor.zmax if it exists otherwise mwxrun.zmax.
transverse_fac (float): Scale the transverse energy distribution by
this factor. Default 1. See
:func:`mewarpx.utils_store.util.get_velocities` for details.
xdir (int): 1 to emit in +x, -1 to emit in -x.
kwargs (dict): Any other keyword arguments supported by the parent
Emitter constructor (such as "emission_type").
"""
# Default initialization
super(XPlaneEmitter, self).__init__(T=T, conductor=conductor, **kwargs)
self.transverse_fac = transverse_fac
self.xdir = int(round(xdir))
if self.xdir not in [-1, 1]:
raise ValueError("xdir must be +1 or -1 for x-plane emitters.")
self.x = x
if self.x is None:
try:
self.x = conductor.x
except AttributeError:
try:
if self.xdir == 1:
attr = 'xmax'
else:
attr = 'xmin'
self.x = getattr(conductor, attr)
except AttributeError:
raise AttributeError(
'x must be specified for x-plane emitter if the '
f'attached conductor does not specify x or {attr}'
)
# Determine bounds
# Will be 4 element array [ymin, ymax, zmin, zmax]
self.bounds = []
for boundstr in ['ymin', 'ymax', 'zmin', 'zmax']:
bound = locals()[boundstr]
if bound is None:
bound = (
getattr(mwxrun, boundstr) if not hasattr(conductor, boundstr) else
getattr(conductor, boundstr)
)
self.bounds.append(bound)
# Compute area
y_range = self.bounds[1] - self.bounds[0]
z_range = self.bounds[3] - self.bounds[2]
if self.solver_geom == 'XZ':
logger.info("y span is 1m for purposes of charge injection")
y_range = 1.
self.area = y_range * z_range
# Compute cell count
if self.solver_geom == 'XZ':
self.cell_count = self.area / mwxrun.dz
else:
self.cell_count = self.area / (mwxrun.dz * mwxrun.dy)
def _get_xv_coords(self, npart, m, rseed):
"""Get particle coordinates given particle number.
See :func:`mewarpx.emission.BaseEmitter.get_newparticles` for details.
"""
if rseed is not None:
nprstate = np.random.get_state()
np.random.seed(rseed)
rseedv = np.random.randint(1000000000)
rseedx = np.random.randint(1000000000)
else:
rseedv = None
rseedx = None
# in sampling the positions and the velocities the x and z coordinates
# are swapped so that the same functions as for the ZPlaneEmitter can
# be used
vz, vy, vx = mwxutil.get_velocities(
npart, self.T, m=m, transverse_fac=self.transverse_fac,
emission_type=self.emission_type, rseed=rseedv)
z, y, x = mwxutil.get_positions(
npart, xmin=self.bounds[2], xmax=self.bounds[3],
ymin=self.bounds[0], ymax=self.bounds[1], z=self.x,
rseed=rseedx)
# Flip x velocities if needed. This appears to be faster than
# an if statement for 10000 or fewer particles.
vx = self.xdir * vx
if rseed is not None:
np.random.set_state(nprstate)
return x, y, z, vx, vy, vz
[docs] def get_normals(self, x, y, z):
"""Calculate local surface normal at specified coordinates.
Arguments:
x (np.ndarray): x-coordinates of emitted particles (in meters).
y (np.ndarray): y-coordinates of emitted particles (in meters).
z (np.ndarray): z-coordinates of emitted particles (in meters).
Returns:
normals (np.ndarray): nx3 array containing the outward surface
normal vector at each particle location.
"""
normals = np.zeros((len(x), 3))
normals[:, 0] = self.xdir
return normals
[docs]class ZDiscEmitter(Emitter):
"""This injects over an x-y disc rather than a rectangle."""
geoms = ['RZ']
def __init__(self, conductor, T=None, inner_emission_radius=None,
outer_emission_radius=None, transverse_fac=1.0, **kwargs):
"""Initialize an emitter for a disc (circular) cathode.
Arguments:
conductor (:class:`mewarpx.assemblies.Assembly`): Conductor object,
used to obtain work function and z coordinate/direction.
T (float): Temperature in Kelvin for the emitter; determines
velocities. If not specified the temperature of the conductor
will be used.
inner_emission_radius (float): Inner radius of the disc (in meters)
for particles to be emitted from. Default mwxrun.rmin.
outer_emission_radius (float): Outer radius of the disc (in meters)
for particles to be emitted from. Default mwxrun.rmax.
transverse_fac (float): Scale the transverse energy distribution by
this factor. Default 1. See
:func:`mewarpx.utils_store.util.get_velocities` for details.
kwargs (dict): Any other keyword arguments supported by the parent
Emitter constructor (such as "emission_type").
Notes:
The center of the disc is always x = y = 0 at present.
"""
# Default initialization
super(ZDiscEmitter, self).__init__(T=T, conductor=conductor, **kwargs)
self.z = conductor.z
self.zsign = conductor.zsign
self.transverse_fac = transverse_fac
# Save input parameters
if inner_emission_radius is None:
inner_emission_radius = mwxrun.rmin
self.inner_emission_radius = inner_emission_radius
if outer_emission_radius is None:
outer_emission_radius = mwxrun.rmax
self.outer_emission_radius = outer_emission_radius
self.area = (
np.pi
* (self.outer_emission_radius**2 - self.inner_emission_radius**2)
)
# Compute cell count
self.cell_count = (
(self.outer_emission_radius - self.inner_emission_radius)
/ mwxrun.dr
)
def _get_xv_coords(self, npart, m, rseed):
"""Get particle coordinates given particle number.
See :func:`mewarpx.emission.BaseEmitter.get_newparticles` for details.
"""
if rseed is not None:
nprstate = np.random.get_state()
np.random.seed(rseed)
rseedv = np.random.randint(1000000000)
rseedx = np.random.randint(1000000000)
else:
rseedv = None
rseedx = None
vx, vy, vz = mwxutil.get_velocities(
npart, self.T, m=m, transverse_fac=self.transverse_fac,
emission_type=self.emission_type, rseed=rseedv)
x, y, z = mwxutil.get_positions_RZ(
npart, rmin=self.inner_emission_radius,
rmax=self.outer_emission_radius, z=self.z,
rseed=rseedx)
# Flip z velocities for anode emission. This appears to be faster than
# an if statement for 10000 or fewer particles.
vz = -self.zsign * vz
if rseed is not None:
np.random.set_state(nprstate)
return x, y, z, vx, vy, vz
[docs] def get_normals(self, x, y, z):
"""Calculate local surface normal at specified coordinates.
Arguments:
x (np.ndarray): x-coordinates of emitted particles (in meters).
y (np.ndarray): y-coordinates of emitted particles (in meters).
z (np.ndarray): z-coordinates of emitted particles (in meters).
Returns:
normals (np.ndarray): nx3 array containing the outward surface
normal vector at each particle location.
"""
normals = np.zeros((len(x), 3))
normals[:, 2] = -self.zsign
return normals
[docs]class ZCylinderEmitter(Emitter):
"""This injects over the side faces of a cylinder oriented along z."""
geoms = ['RZ', 'XYZ']
def __init__(self, conductor, T=None, zmin=None, zmax=None, rdir=1,
transverse_fac=1.0, **kwargs):
"""Initialize a 3D cylindrical emitter oriented along the z-axis.
Arguments:
conductor (:class:`mewarpx.assemblies.Assembly`): Conductor object,
used to obtain work function, coordinates and possibly
temperature.
T (float): Temperature in Kelvin for the emitter; determines
velocities. If not specified the temperature of the conductor
will be used.
zmin (float): Lower z-coordinate for emitting surface. Default
conductor.zmin if it exists otherwise mwxrun.zmin.
zmax (float): Upper z-coordinate for emitting surface. Default
conductor.zmax if it exists otherwise mwxrun.zmax.
rdir (float): 1 for emitting outward of the cylinder (will use
r_outer attribute of the conductor for the emitting surface),
-1 for emitting inward towards r = 0 (will use r_inner attribute
of the conductor for the emitting surface). Default 1.
transverse_fac (float): Scale the transverse energy distribution by
this factor. Default 1. See
:func:`mewarpx.utils_store.util.get_velocities` for details.
kwargs (dict): Any other keyword arguments supported by the parent
Emitter constructor (such as "emission_type").
Notes:
The center of the cylinder is always x = y = 0 at present.
"""
# Default initialization
super(ZCylinderEmitter, self).__init__(T=T, conductor=conductor,
**kwargs)
self.zmin = zmin
if self.zmin is None:
if hasattr(conductor, 'zmin'):
self.zmin = conductor.zmin
else:
self.zmin = mwxrun.zmin
self.zmax = zmax
if self.zmax is None:
if hasattr(conductor, 'zmax'):
self.zmax = conductor.zmax
else:
self.zmax = mwxrun.zmax
self.rdir = int(round(rdir))
if self.rdir not in [-1, 1]:
raise ValueError("rdir must be +1 or -1 for z-cylinder emitters.")
if self.rdir == 1:
self.r = conductor.r_outer + 1e-10
else:
self.r = conductor.r_inner - 1e-10
self.transverse_fac = transverse_fac
# sanity check
if self.r <= 0:
raise AttributeError("Cannot emit from a cylinder with 0 radius.")
self.area = 2.0 * np.pi * self.r * (self.zmax - self.zmin)
# Compute cell count
if mwxrun.geom_str == 'RZ':
self.cell_count = (self.zmax - self.zmin) / mwxrun.dz
elif mwxrun.geom_str == 'XYZ':
self.cell_count = (
self.area
/ min(mwxrun.dx * mwxrun.dy, mwxrun.dx * mwxrun.dz,
mwxrun.dy * mwxrun.dz)
)
def _get_xv_coords(self, npart, m, rseed):
"""Get particle coordinates given particle number.
See :func:`mewarpx.emission.BaseEmitter.get_newparticles` for details.
"""
if rseed is not None:
nprstate = np.random.get_state()
np.random.seed(rseed)
# rseedv is passed to get velocities. The basic rseed here is used
# for positions, below.
rseedv = np.random.randint(1000000000)
else:
rseedv = None
theta = np.random.uniform(0.0, 2.0*np.pi, npart)
cos_theta = np.cos(theta)
sin_theta = np.sin(theta)
x = self.r * cos_theta
y = self.r * sin_theta
z = np.random.uniform(self.zmin, self.zmax, npart)
vz, v_trans, v_long = mwxutil.get_velocities(
npart, self.T, m=m, transverse_fac=self.transverse_fac,
emission_type=self.emission_type, rseed=rseedv)
# Flip the longitudinal velocity if needed. This appears to be faster
# than an if statement for 10000 or fewer particles.
v_long = self.rdir * v_long
vx = cos_theta * v_long - sin_theta * v_trans
vy = sin_theta * v_long + cos_theta * v_trans
if rseed is not None:
np.random.set_state(nprstate)
return x, y, z, vx, vy, vz
[docs] def get_normals(self, x, y, z):
"""Calculate local surface normal at specified coordinates.
Arguments:
x (np.ndarray): x-coordinates of emitted particles (in meters).
y (np.ndarray): y-coordinates of emitted particles (in meters).
z (np.ndarray): z-coordinates of emitted particles (in meters).
Returns:
normals (np.ndarray): nx3 array containing the outward surface
normal vector at each particle location.
"""
normals = np.zeros((len(x), 3))
# The normal is r-hat
r = np.sqrt(x**2 + y**2)
normals[:, 0] = x / r
normals[:, 1] = y / r
return normals
[docs]class ArbitraryEmitter2D(Emitter):
""" ArbitraryEmitter2D class takes in a conductor, calculates an approximate
surface that encloses the conductor and then sets up the appropriate
emitting surfaces, given a number of particles to emit.
"""
geoms = ['XZ']
def __init__(self, conductor, T=None, res_fac=5., transverse_fac=1.0,
**kwargs):
"""Construct the emitter based on conductor object and temperature.
Arguments:
conductor (mewarpx.assemblies object): Conductor to emit from.
T (float): Temperature in Kelvin. If not specified the temperature
of the conductor will be used.
res_fac (float): Level of resolution beyond the grid resolution to
use for calculating shape contours.
transverse_fac (float): Scale the transverse energy distribution by
this factor. Default 1. See
:func:`mewarpx.utils_store.util.get_velocities` for details.
kwargs (dict): Any other keyword arguments supported by the parent
Emitter constructor (such as "emission_type").
"""
# Default initialization
super(ArbitraryEmitter2D, self).__init__(
T=T, conductor=conductor, **kwargs
)
# Save input parameters
self.res_fac = res_fac
self.transverse_fac = transverse_fac
# Generate grid enclosed in bounding box
self.dx = mwxrun.dx/res_fac
self.dy = 1.
self.dz = mwxrun.dz/res_fac
self.dA = np.sqrt(self.dx*self.dz)
# A small delta is added to the maxima here; this ensures the last point
# is included. Without it, floating point errors determine whether or
# not the last point is included.
self.xvec = np.arange(
mwxrun.xmin, mwxrun.xmax + self.dx/1000., self.dx)
self.yvec = [0.]
self.zvec = np.arange(
mwxrun.zmin, mwxrun.zmax + self.dz/1000., self.dz)
[X, Y, Z] = np.squeeze(np.meshgrid(self.xvec, self.yvec, self.zvec,
indexing='xy'))
oshape = X.shape
X = X.flatten()
Y = Y.flatten()
Z = Z.flatten()
inside = np.reshape(
self.conductor.isinside(X, Y, Z, aura=self.dA/5.),
oshape)
# level of 0.17 was chosen to keep the original ratio of 0.5:3 from warp
# compared to 0.17:1 now in warpx
# increasing the level causes particles to be injected inside cylinder
self.contours = np.squeeze(skimage.measure.find_contours(
inside, 0.17))
self.contours[:, 0] = np.interp(self.contours[:, 0],
np.arange(self.xvec.size),
self.xvec)
self.contours[:, 1] = np.interp(self.contours[:, 1],
np.arange(self.zvec.size),
self.zvec)
self.centers = np.array(
[(self.contours[1:, 0] + self.contours[:-1, 0])/2.,
(self.contours[1:, 1] + self.contours[:-1, 1])/2.]).T
self.dvec = np.array(
[self.contours[1:, 0] - self.contours[:-1, 0],
self.contours[1:, 1] - self.contours[:-1, 1]]).T
# Calculate the distance of each segment & sum to calculate the area
self.distances = np.sqrt(self.dvec[:, 0]**2 + self.dvec[:, 1]**2)
self.area = sum(self.distances)
self.cell_count = self.area / min(mwxrun.dx, mwxrun.dz)
self.CDF = np.cumsum(self.distances)/self.area
# Calculate Normal Vector by taking cross product with y-hat
ndvec = self.dvec/np.tile(self.distances, (2, 1)).T
marching_normal = np.zeros(self.dvec.shape)
marching_normal[:, 0] = -ndvec[:, 1]
marching_normal[:, 1] = ndvec[:, 0]
# Check to make sure normal plus center is outside of conductor
partdist = self.dA * float(self.res_fac) / 2.
pos = self.centers + marching_normal * partdist
px = pos[:, 0]
py = np.zeros_like(px)
pz = pos[:, 1]
nhat = self.conductor.calculatenormal(px, py, pz)
self.normal = nhat[[0, 2], :].T
def _get_xv_coords(self, npart, m, rseed):
"""Get particle coordinates given particle number.
See :func:`mewarpx.emitter.get_newparticles` for details.
"""
if rseed is not None:
nprstate = np.random.get_state()
np.random.seed(rseed)
# rseedv is passed to get velocities. The basic rseed here is used
# for positions, below.
rseedv = np.random.randint(1000000000)
else:
rseedv = None
# Draw Random Numbers to determine which face to emit from
self.contour_idx = np.searchsorted(self.CDF, np.random.rand(npart))
vels = np.column_stack(mwxutil.get_velocities(
num_samples=npart, T=self.T, m=m,
rseed=rseedv,
transverse_fac=self.transverse_fac,
emission_type=self.emission_type
))
# Rotate velocities based on angle of normal
newvels = self.convert_vel_zhat_nhat(
vels, self.normal[self.contour_idx])
vx = np.asarray(newvels[:, 0], order="C")
vy = np.asarray(newvels[:, 1], order="C")
vz = np.asarray(newvels[:, 2], order="C")
# Now get positions
pos1 = self.contours[self.contour_idx, :]
positions = (pos1 +
(np.tile(np.random.rand(npart), (2, 1)).T
* self.dvec[self.contour_idx, :]))
x = np.asarray(positions[:, 0], order="C")
y = np.asarray([0.]*npart, order="C")
z = np.asarray(positions[:, 1], order="C")
if rseed is not None:
np.random.set_state(nprstate)
return x, y, z, vx, vy, vz
[docs] @staticmethod
# Synthetic tests showed 18 ms to 660us change from using np.dot +
# numba compilation. Without these changes, this function was taking 2-4% of
# some run times so the improvement is warranted.
@numba.jit(nopython=True)
def convert_vel_zhat_nhat(vels, nhat):
"""Create a rotation matrix for Zhat to Nhat"""
Zhat = np.array([0., 1.])
newvels = np.zeros(vels.shape)
for ii in range(vels.shape[0]):
Cvec = Zhat - nhat[ii, :]
Cvec2 = np.dot(Cvec, Cvec)
theta = np.arccos(1. - Cvec2/2.)
# Check to see if normal is pointing toward -xhat
# Resolves angle ambiguity in law of cosines
if nhat[ii, 0] < 0.:
theta = -theta
# Rotate in XZ plane, keeping Y the same
R = np.array([[np.cos(theta), 0., np.sin(theta)],
[0., 1., 0.],
[-np.sin(theta), 0., np.cos(theta)]])
newvels[ii, :] = np.dot(R, vels[ii, :])
return newvels
[docs] def get_normals(self, x, y, z):
"""Calculate local surface normal at specified coordinates.
Arguments:
x (np.ndarray): x-coordinates of emitted particles (in meters).
y (np.ndarray): y-coordinates of emitted particles (in meters).
z (np.ndarray): z-coordinates of emitted particles (in meters).
Returns:
normals (np.ndarray): nx3 array containing the outward surface
normal vector at each particle location.
"""
# Since we've already pre-computed all the normals and already picked
# the right ones during the call to _get_xv_coords(), we can ignore the
# coordinate arguments here entirely and use the recently saved
# "contour_idx" values for indexing the pre-tabulated normals. To
# prevent this from being abused, we'll first check that the length of
# the coordinate lists matches that of the contour_idx list.
if len(x) != len(self.contour_idx):
raise ValueError('Length of particle coordinate list does not match'
+ ' the most recent number of emitted particles!')
normals = np.zeros((len(x), 3))
normals[:, 0] = self.normal[self.contour_idx, 0]
normals[:, 2] = self.normal[self.contour_idx, 1]
return normals
[docs] def plot_contours(self):
"""Plots the contours generated for the assembly object and the
assembly object. The object is plotted in yellow, and the contours
are plotted in blue. The plot is saved in contours.png"""
# calculate which tiles are inside of assembly object
self.xvec = np.arange(
mwxrun.xmin, mwxrun.xmax + self.dx/1000., self.dx)
self.yvec = [0.]
self.zvec = np.arange(
mwxrun.zmin, mwxrun.zmax + self.dz/1000., self.dz)
[X, Y, Z] = np.squeeze(np.meshgrid(self.xvec, self.yvec, self.zvec,
indexing='xy'))
oshape = X.shape
X = X.flatten()
Y = Y.flatten()
Z = Z.flatten()
inside = np.reshape(
self.conductor.isinside(X, Y, Z, aura=self.dA/5.),
oshape)
contours = np.array(skimage.measure.find_contours(inside, 0.17))
# plot assembly object first
assembly_cmap = colors.LinearSegmentedColormap.from_list(
'my_cmap', ['white', '#66c2a5'], 256)
fig, ax = plt.subplots()
ax.imshow(inside, cmap=assembly_cmap, origin="lower")
# plot contours
for contour in contours:
ax.plot(contour[:, 1], contour[:, 0], linewidth=2, color="#fc8d62")
# set title and labels
ax.set_title(f"{self.conductor.name} contour plot")
x_range = [self.res_fac * mwxrun.zmin / mwxrun.dz, self.res_fac * mwxrun.zmax / mwxrun.dz]
y_range = [self.res_fac * mwxrun.xmin / mwxrun.dx, self.res_fac * mwxrun.xmax / mwxrun.dx]
x_step = mwxrun.dz / (self.res_fac)
y_step = mwxrun.dx / (self.res_fac)
minor_xticks = np.linspace(x_range[0], x_range[1], mwxrun.nz)
minor_yticks = np.linspace(y_range[0], y_range[1], mwxrun.nx)
major_xticks = np.linspace(x_range[0], x_range[1], 5)
major_yticks = np.linspace(y_range[0], y_range[1], 5)
ax.set_xlabel("Z (m)")
ax.set_ylabel("X (m)")
ax.set_xticks(major_xticks)
ax.set_xticks(minor_xticks, minor=True)
ax.set_xticklabels(np.round(major_xticks * x_step, 8), rotation=45)
ax.set_yticks(major_yticks)
ax.set_yticks(minor_yticks, minor=True)
ax.set_yticklabels(np.round(major_yticks * y_step, 8))
ax.grid(visible=True, which="minor")
ax.set_aspect(mwxrun.dx/mwxrun.dz, adjustable='box')
fig.tight_layout()
fig.savefig(f"{self.conductor.name}_contour_plot.png")
[docs]class VolumeEmitter(BaseEmitter):
"""Parent class for volumetric particle injection coordinates.
- ``volume`` gives the spatial volume in m^3
- ``_get_x_coords()`` implements the subclass-specific particle
injection logic
"""
volume = 0
geoms = ['Z', 'XZ', 'RZ', 'XYZ']
def __init__(self, T, xmin=None, xmax=None, ymin=None, ymax=None,
zmin=None, zmax=None, rmin=None, rmax=None):
"""Initialize emitter boundaries. A rectangular or cylindrical emitter
volume is supported. If x & y boundaries are specified the r boundaries
will be ignored and vice versa. If both x & y and r boundaries are
specified an AttributeError will be raised. If no boundaries are given
the simulation geometry and boundaries will be used.
Arguments:
T (float): Emitter temperature in Kelvin. Determines particle
velocity distribution.
x/y/z/rmin (float): Lower boundary of the volume.
x/y/z/rmax (float): Upper boundary of the volume.
"""
super(VolumeEmitter, self).__init__()
self.T = T
# determine default prism type from simulation geometry
self.rectangular = mwxrun.geom_str != 'RZ'
# check if a different volume was specified
r_bounds_given = (rmin is not None or rmax is not None)
xy_bounds_given = (
xmin is not None or xmax is not None or ymin is not None or
ymax is not None
)
if r_bounds_given and xy_bounds_given:
raise AttributeError(
"Both rectangular and cylindrical boundaries specified for a "
"VolumeEmitter"
)
if r_bounds_given:
self.rectangular = False
if xy_bounds_given:
self.rectangular = True
self.bounds = np.zeros((3, 2))
if self.rectangular:
for ii, (lim, defaultlim) in enumerate(
zip([xmin, xmax, ymin, ymax, zmin, zmax],
[mwxrun.xmin, mwxrun.xmax, mwxrun.ymin,
mwxrun.ymax, mwxrun.zmin, mwxrun.zmax])
):
if lim is None:
lim = defaultlim
self.bounds[ii // 2, ii % 2] = lim
self.volume = np.prod(self.bounds[:, 1] - self.bounds[:, 0])
# handle cylindrical case
else:
for ii, (lim, defaultlim) in enumerate(
zip([rmin, rmax, 0, 2.0*np.pi, zmin, zmax],
[mwxrun.rmin, mwxrun.rmax, 0, 2.0*np.pi,
mwxrun.zmin, mwxrun.zmax])
):
if lim is None:
lim = defaultlim
self.bounds[ii // 2, ii % 2] = lim
self.volume = (
np.pi * (self.bounds[0, 1]**2 - self.bounds[0, 0]**2)
* (self.bounds[2, 1] - self.bounds[2, 0])
)
# Note the negation here will catch nans, checking <= 0 won't.
if not (self.volume > 0):
raise RuntimeError("Invalid warpX geometry limits.")
[docs] def getvoltage(self):
"""Ideally this is probably the local potential, but default to 0."""
return 0.
[docs] def getvoltage_e(self):
"""Ideally this is probably the local potential, but default to 0."""
return self.getvoltage()
def _get_xv_coords(self, npart, m, rseed):
"""Get velocities and call specialized function for position."""
if rseed is not None:
nprstate = np.random.get_state()
np.random.seed(rseed)
# rseedv is passed to get velocities. The basic rseed here is used
# for positions, below.
rseedv = np.random.randint(1000000000)
else:
rseedv = None
x_coords = self._get_x_coords(npart)
v_coords = mwxutil.get_velocities(
npart, self.T, m=m, emission_type='random', rseed=rseedv
)
if rseed is not None:
np.random.set_state(nprstate)
return (
x_coords[:, 0], x_coords[:, 1], x_coords[:, 2],
v_coords[0], v_coords[1], v_coords[2]
)
[docs]class ZSinDistributionVolumeEmitter(VolumeEmitter):
"""Vary density in z as a half-period sin wave."""
def _get_x_coords(self, npart):
"""Get coordinates with sin distribution.
rseed, if used, is handled by the parent function.
"""
if self.rectangular:
xpos = np.random.uniform(self.bounds[0, 0], self.bounds[0, 1],
npart)
ypos = np.random.uniform(self.bounds[1, 0], self.bounds[1, 1],
npart)
# handle cylindrical case
else:
r = np.sqrt(np.random.uniform(
self.bounds[0, 1]**2, self.bounds[0, 0]**2, npart
))
theta = np.random.uniform(self.bounds[1, 0], self.bounds[1, 1],
npart)
xpos = r * np.cos(theta)
ypos = r * np.sin(theta)
z_random_draw = np.random.random(npart)
zpos = (
np.arccos(1 - 2.0*z_random_draw) / np.pi
* (self.bounds[2, 1] - self.bounds[2, 0])
+ self.bounds[2, 0]
)
return np.array([xpos, ypos, zpos]).T
[docs]class XGaussZSinDistributionVolumeEmitter(VolumeEmitter):
"""Vary density in z as a half-period sin wave, and density in x as a
Gaussian."""
geoms = ['XZ']
def __init__(self, x_sigma, *args, **kwargs):
super(XGaussZSinDistributionVolumeEmitter, self).__init__(
*args, **kwargs)
if x_sigma <= 0:
raise ValueError(
f"x_sigma must be greater than 0, not {x_sigma}."
)
# Domain width in x
x_width = self.bounds[0, 1] - self.bounds[0, 0]
# The number of standard deviations to truncate at, half the domain
# away from the center of the simulation
trunc = (x_width/2.0) / x_sigma
mu = (self.bounds[0, 1] + self.bounds[0, 0])/2.0
self.truncnorm = scipy.stats.truncnorm(a=-trunc, b=trunc,
loc=mu, scale=x_sigma)
def _get_x_coords(self, npart):
"""Get coordinates with sin distribution in z and Guassian in x.
rseed, if used, is handled by the parent function.
"""
xpos = self.truncnorm.rvs(npart)
ypos = np.random.uniform(self.bounds[1, 0], self.bounds[1, 1], npart)
z_random_draw = np.random.random(npart)
zpos = (
np.arccos(1 - 2.0*z_random_draw) / np.pi
* (self.bounds[2, 1] - self.bounds[2, 0])
+ self.bounds[2, 0]
)
return np.array([xpos, ypos, zpos]).T
[docs]class ArbitraryDistributionVolumeEmitter(VolumeEmitter):
"""Varies density based on given particle density array. NPPC for each
species must be an integer greater than or equal to 1."""
geoms = ["XZ"]
def __init__(self, d_grid, *args, **kwargs):
"""
Arguments:
d_grid (np.ndarray): array of particle densities given in cm^-3
"""
super(ArbitraryDistributionVolumeEmitter, self).__init__(
*args, **kwargs)
# interpolate density onto simulation grid and integrate
interp_dgrid = self._interpolate_grid(d_grid)
self.d_grid = (interp_dgrid * mwxrun.dx * mwxrun.dz
* (mwxrun.ymax - mwxrun.ymin)
* 1e6).ravel()
self.add_wfn(self._weight_function)
@staticmethod
def _interpolate_grid(input_grid):
"""Takes a grid (2d array) as an input and interpolates the values on
to the current simulation grid.
Arguments:
input_grid (np.ndarray): 2d array of any shape representing a
2d grid of plasma densities
Returns:
output_grid (np.ndarray): 2d array of current simulation dimensions
interpolated values from input_grid
"""
input_shape = input_grid.shape
# pad edges with current edge values by 1 to avoid NaN when interpolating
input_x = np.linspace(-0.5, input_shape[0] + 0.5, input_shape[0] + 2)
input_z = np.linspace(-0.5, input_shape[1] + 0.5, input_shape[1] + 2)
input_grid = np.pad(input_grid, (1,), "reflect", reflect_type="odd")
input_zz, input_xx = np.meshgrid(input_z, input_x)
input_xx = input_xx * mwxrun.nx/input_shape[0]
input_zz = input_zz * mwxrun.nz/input_shape[1]
input_coord = np.column_stack([input_xx.flatten(), input_zz.flatten()])
output_x = np.linspace(0.5, mwxrun.nx - 0.5, mwxrun.nx)
output_z = np.linspace(0.5, mwxrun.nz - 0.5, mwxrun.nz)
output_zz, output_xx = np.meshgrid(output_z, output_x)
output_grid = scipy.interpolate.griddata(
input_coord, input_grid.flatten(), (output_xx, output_zz)
)
return output_grid
def _get_x_coords(self, npart):
"""Uniformly samples particles in each grid cell, so each cell has the
same number of particles."""
nppc = npart / (mwxrun.nx * mwxrun.nz)
if not nppc.is_integer() or nppc < 1:
raise ValueError(f"When using ArbitraryDistributionVolumeEmitter, "
f"number of particles per cell per species must "
f"be an integer greater than or equal to 1. "
f"Currently it is {nppc}.")
random_dx = np.random.uniform(-mwxrun.dx / 2, mwxrun.dx / 2, npart)
random_dz = np.random.uniform(-mwxrun.dz / 2, mwxrun.dz / 2, npart)
x = np.linspace(self.bounds[0, 0] + mwxrun.dx/2, self.bounds[0, 1] - mwxrun.dx/2, mwxrun.nx)
z = np.linspace(self.bounds[2, 0] + mwxrun.dz/2, self.bounds[2, 1] - mwxrun.dz/2, mwxrun.nz)
zz, xx = np.meshgrid(z, x)
xx = np.tile(xx.ravel(), int(nppc)) + random_dx
zz = np.tile(zz.ravel(), int(nppc)) + random_dz
yy = np.zeros_like(xx)
return np.array([xx, yy, zz]).T
def _weight_function(self, particle_dict):
"""Calculates the weight of each particle based on their position and
the given seed density grid.
Arguments:
particle_dict (dict): Contains lists, each with length equal to the
number of particles
Returns:
w (np.ndarray): flattened array of particle weights
"""
# create a bin for each grid cell
x_bin = np.linspace(mwxrun.xmin, mwxrun.xmax, mwxrun.nx + 1)
z_bin = np.linspace(mwxrun.zmin, mwxrun.zmax, mwxrun.nz + 1)
# bin particles based on position
ret = scipy.stats.binned_statistic_2d(
particle_dict["x"], particle_dict["z"], None, "count",
bins=[x_bin, z_bin], expand_binnumbers=True
)
bin_counts = ret.statistic.flatten()
# calculate weight per particle per cell
w_grid = np.divide(self.d_grid, bin_counts,
out=np.zeros_like(self.d_grid),
where=bin_counts != 0)
# apply particle weights to each particle
w = w_grid[(ret.binnumber[0, :] - 1) * mwxrun.nz + ret.binnumber[1, :] - 1]
return w