Source code for mewarpx.utils_store.util

"""
Utility functions for mewarpx.
"""
import collections
import errno
import inspect
import logging
import os
import warnings

import numpy as np

import mewarpx
from mewarpx.utils_store import mwxconstants as constants
from pywarpx import geometry

logger = logging.getLogger(__name__)

# http://stackoverflow.com/questions/50499/in-python-how-do-i-get-the-path-and-name-of-the-file-t
mewarpx_dir = os.path.join(os.path.dirname(os.path.abspath(
    inspect.getfile(inspect.currentframe()))), "..")


[docs]def check_version(script_version, script_physics_version): """Check that a run script is compatible with this mewarpx. If this mewarpx distribution is *lower* than that of the run script, throw an error. If this mewarpx distribution API or physics version is higher than that of the run script, throw a warning. Else do nothing. Arguments: script_version (tuple): A tuple of ints representing the mewarpx version of the run script: (API_version, feature_version, patch_version). script_physics_version (int): Integer representing the physics version of the run script. """ mewarpx_version = mewarpx.__version_info__ mewarpx_physics_version = mewarpx.__physics_version__ # Tuple comparison works well in Python and does what we want! if mewarpx_version < script_version: raise ValueError( f"This version of mewarpx {mewarpx_version} is older than the " f"version {script_version} this script was designed for." ) # I'm not sure of any instance where mewarpx physics version would be < # script physics version but software version would not be, but still # safest to do the check. if mewarpx_physics_version < script_physics_version: raise ValueError( f"This physics version of mewarpx {mewarpx_physics_version} is " f"older than the version {script_physics_version} this script was " "written for." ) # Warnings only printed if API or physics versions are out of date. if mewarpx_version[0] > script_version[0]: logger.warning( f"This version of mewarpx {mewarpx_version} is a newer API " f"version than the version {script_version} this script was " "designed for. Incompatibilities may be present." ) if mewarpx_physics_version > script_physics_version: logger.warning( f"This physics version of mewarpx {mewarpx_physics_version} is " f"newer than the version {script_physics_version} this script was " "written for. Results may be different now." )
[docs]def get_velocities(num_samples, T, m, emission_type='thermionic', transverse_fac=1.0, rseed=None): """Generate array of random [vx, vy, vz] for cathode-emitted electrons. Arguments: num_samples (int): Number of particles to generate velocities for T (float): Temperature for the electrons (usually material temp) (K) m (float): Mass of elementary particle (kg) emission_type (str): Use "thermionic" for a thermionic emitter oriented along +zhat, and use "random" for a purely thermal distribution with no preferred direction. "half_maxwellian" is used at present for surface ionization, again along +zhat. Defaults to "thermionic". transverse_fac (float): Scale the particle's x and y average energies by this factor, scales z average energy to conserve total average particle energy in the distribution. Default 1., Min 0., Max 2. rseed (positive int): If specified, seed the random number generator. Used for testing. The random number generator is set back at the end of the function. Returns: velocities (np.ndarray): array of shape (num_samples, 3) with (vx, vy, vz) for each electron. """ if (emission_type != 'thermionic') and not np.isclose(transverse_fac, 1.0): return ValueError('transverse_fac is a support argument only for ' 'thermionic emissiion models!') if rseed is not None: nprstate = np.random.get_state() np.random.seed(rseed) sigma = np.sqrt(constants.kb_J * T / m) if transverse_fac < 0.: warnings.warn('WARNING: transverse_fac is out of bounds\n' 'Constraining to minimum value of 0.') beta = 0. elif transverse_fac > 2.: warnings.warn('WARNING: transverse_fac is out of bounds\n' 'Constraining to maximum value of 2.') beta = np.sqrt(2.) else: beta = np.sqrt(transverse_fac) alpha = np.sqrt(2. - beta**2.) # vx and vy follow Maxwellian distributions vx = sigma * np.random.randn(num_samples) * beta vy = sigma * np.random.randn(num_samples) * beta if emission_type == 'random': vz = sigma * np.random.randn(num_samples) * beta elif emission_type == 'thermionic': # vz is truncated with k_B*T/m << Phi assumed. See # "Emission Distribution from a Thermionic Cathode" document # on Bloomfire. P = np.random.rand(num_samples) vz = np.sqrt(-2 * sigma**2 * np.log(1 - P)) * alpha elif emission_type == 'half_maxwellian': vz = np.abs(sigma * np.random.randn(num_samples) * beta) else: raise ValueError(f'Unsupported emission type "{emission_type}".') if rseed is not None: np.random.set_state(nprstate) return vx, vy, vz
[docs]def get_positions(num_samples, xmin, xmax, ymin=0, ymax=0, z=0, rseed=None): """Provide random samples of [x, y, z] for electrons in simulation. In x and y, positions are uniformly distributed. In z, positions are placed at the emitter. Arguments: num_samples (int): Number of particles to generate positions for xmin (float): Min position in x (meters) xmax (float): Max position in x (meters) ymin (float): Min position in y (meters) ymax (float): Max position in y (meters) z (float): Position of the emitter on the z-axis (meters) rseed (positive int): If specified, seed the random number generator. Used for testing. The random number generator is set back at the end of the function. Returns: positions (np.ndarray): Array of shape (num_samples, 3) with positions. """ if rseed is not None: nprstate = np.random.get_state() np.random.seed(rseed) # Random x and y positions x = np.random.uniform(xmin, xmax, num_samples) y = np.random.uniform(ymin, ymax, num_samples) z = np.ones_like(x) * z if rseed is not None: np.random.set_state(nprstate) return x, y, z
[docs]def get_positions_RZ(num_samples, rmin, rmax, theta_min=0, theta_max=(2*np.pi), z=0, rseed=None): """Provide random samples of [x, y, z] for electrons in simulation. Positions are uniformly distributed in r. In z, positions are placed at the emitter. Arguments: num_samples (int): Number of particles to generate positions for rmin (float): Min position in r (meters) rmax (float): Max position in r (meters) theta_min (float): Min angle (radians) theta_max (float): Max angle (radians) z (float): Position of the emitter on the z-axis (meters) rseed (positive int): If specified, seed the random number generator. Used for testing. The random number generator is set back at the end of the function. Returns: positions (np.ndarray): Array of shape (num_samples, 3) with positions. """ if rseed is not None: nprstate = np.random.get_state() np.random.seed(rseed) # Random r and theta positions r = np.sqrt(np.random.uniform(rmin**2, rmax**2, num_samples)) theta = np.random.uniform(theta_min, theta_max, num_samples) # Transform to x and y x = r * np.cos(theta) y = r * np.sin(theta) # Fixed z z = np.ones_like(x) * z if rseed is not None: np.random.set_state(nprstate) return x, y, z
[docs]def return_iterable(x, depth=1): """Return x if x is iterable, None if x is None, [x] otherwise. Useful for arguments taking either a list or single value. Strings are a special case counted as 'not iterable'. Arguments: depth (int): This many levels must be iterable. So if you need an iterable of an iterable, this is 2. """ if x is None: return None elif depth > 1: # First make sure it's iterable to one less than the required depth. x = return_iterable(x, depth=depth-1) # Now check that it's iterable to the required depth. If not, we just # need to nest it in one more list. x_flattened = x while depth > 1: if all([(isinstance(y, collections.abc.Iterable) and not isinstance(y, str)) for y in x_flattened]): x_flattened = [z for y in x_flattened for z in y] depth -= 1 else: return [x] return x elif isinstance(x, str): return [x] elif isinstance(x, collections.abc.Iterable): return x else: return [x]
# https://stackoverflow.com/questions/600268/mkdir-p-functionality-in-python
[docs]def mkdir_p(path): """Make directory and parent directories if they don't exist. Do not throw error if all directories already exist. """ try: os.makedirs(path) except OSError as exc: if exc.errno == errno.EEXIST and os.path.isdir(path): pass else: raise
[docs]def ideal_gas_density(p, T): """Calculate neutral gas density (in 1/m^3) from the ideal gas law using pressure in Torr. Arguments: p (float): Gas pressure (Torr) T (float): Mean gas temperature (K) Returns: N (float): Number density of gas atoms/molecules (1/m^3) """ return (p * constants.torr_cgs) / (constants.kb_cgs * T) * 1e6
[docs]def J_RD(T, WF, A): """Returns the Richardson-Dushmann thermionic emission given a temperature and effective work function. Constant coefficient of emission (A) is assumed. Arguments: T (float): temperature of the cathode in K WF (float): work function of the cathode in eV A (float): coefficient of emission in Amp/m^2/K^2 Returns: J (float): current density in Amp/m^2 """ return A*T**2*np.exp(-1.*WF/(constants.kb_eV*T))
[docs]def recursive_update(d, u): """Recursively update dictionary d with keys from u. If u[key] is not a dictionary, this works the same as dict.update(). If u[key] is a dictionary, then update the keys of that dictionary within d[key], rather than replacing the whole dictionary. """ # https://stackoverflow.com/questions/3232943/update-value-of-a-nested-dictionary-of-varying-depth for k, v in u.items(): if isinstance(v, collections.Mapping): d[k] = recursive_update(d.get(k, {}), v) else: d[k] = v return d
[docs]def plasma_Debye_length(T, n): """Returns the thermal Debye length of a plasma. Arguments: T (float): plasma temperature in K n (float): plasma density in m^-3 Returns: lambda (float): Debye length in m """ return np.sqrt( constants.epsilon_0 * constants.kb_J * T / (n * constants.e**2) )
[docs]def get_vel_vector(v_mag): """Function that returns a random velocity vector given a magnitude. The random point on a unit sphere is found according to https://math.stackexchange.com/questions/44689/how-to-find-a-random- axis-or-unit-vector-in-3d. """ theta = np.random.rand(*v_mag.shape) * 2.0 * np.pi z = 2.0 * np.random.rand(*v_mag.shape) - 1.0 vel_vectors = np.zeros(v_mag.shape + (3,)) vel_vectors[..., 0] = np.sqrt(1.0 - z**2) * np.cos(theta) vel_vectors[..., 1] = np.sqrt(1.0 - z**2) * np.sin(theta) vel_vectors[..., 2] = z vel_vectors = v_mag[..., None] * vel_vectors # assert(np.allclose(v_mag, np.sqrt(np.sum(vel_vectors**2, axis=1)))) return vel_vectors
[docs]def interpolate_from_grid(coords, grid): """Function to interpolate from grid quantities to given coordinates. Arguments: coords (np.array): Numpy array of coordinates of points where the grid values should be interpolated, with shape (dim, n) where dim is the simulation dimension and n the number of points to interpolate. grid (np.array): Numpy array holding the grid point values (on nodes of the grid). Returns: fpos (np.array): Numpy array of length n holding the interpolated values for each coordinate given. """ mwxrun = mewarpx.mwxrun.mwxrun # sanity checks if coords.shape[0] != mwxrun.dim: raise AttributeError( f"There were {coords.shape[0]} coordinate values given but the " f"simulation has {mwxrun.dim} dimensions." ) n = coords.shape[1] lower_node = np.zeros(coords.shape).astype(int) weights = np.zeros(coords.shape) # Build up coordinate (indexes, spacing, and bounds) # all supported dimensions have a z coordinate coord_specs = [(mwxrun.coord_map['z'], mwxrun.dz, mwxrun.zmin)] if mwxrun.dim > 1: if mwxrun.geom_str == 'RZ': coord_specs.append((mwxrun.coord_map['r'], mwxrun.dr, mwxrun.rmin)) else: coord_specs.append((mwxrun.coord_map['x'], mwxrun.dx, mwxrun.xmin)) if mwxrun.dim == 3: coord_specs.append((mwxrun.coord_map['y'], mwxrun.dy, mwxrun.ymin)) # Iterate over all dimensions for idx, dx, xmin in coord_specs: x_grid = (coords[idx] - xmin) / dx lower_node[idx] = np.floor(x_grid).astype(int) weights[idx] = 1.0 - (x_grid - lower_node[idx]) fpos = np.zeros(n) if mwxrun.dim == 1: fpos += grid[lower_node[0]] * weights[0] fpos += grid[lower_node[0] + 1] * (1.0 - weights[0]) elif mwxrun.dim == 2: fpos += grid[lower_node[0], lower_node[1]] * weights[0] * weights[1] fpos += grid[lower_node[0] + 1,lower_node[1]] * (1.0 - weights[0]) * weights[1] fpos += grid[lower_node[0], lower_node[1] + 1] * weights[0] * (1.0 - weights[1]) fpos += grid[lower_node[0] + 1, lower_node[1] + 1] * (1.0 - weights[0]) * (1.0 - weights[1]) elif mwxrun.dim == 3: raise NotImplementedError("XYZ interpolation not implemented yet.") return fpos
[docs]def mwx_round(x, base=1): """Rounding function that allows rounding around a base value. Arguments: x (float): value to round base (float): base number to round around Returns: val (Union[int, float]): rounded value """ return base * np.round(x / base)