#!/usr/bin/env python
# -*- coding: utf-8 -*-
# 3rd party imports
import numpy as np
__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"
mass_and_charge = {
"hydrogen+": [1.04535e-2, 1],
"helium+": [4.18138e-2, 1],
"helium++": [4.18138e-2, 2],
"oxygen+": [0.167255, 1],
"oxygen++": [0.167255, 2],
}
def _get_energy(inp, dim):
energy_len = len(inp.ccomp.data)
energy_reform = np.reshape(inp.ccomp.data, [energy_len, 1, 1])
# repeated across theta
energy_rebin1 = np.repeat(energy_reform, dim[1], axis=2)
out_energy = np.repeat(energy_rebin1, dim[2], axis=1)
return out_energy
def _get_theta(inp, azimuth_dim):
energy_len = len(inp.ccomp.data)
# elevations are constant across time
# convert colat -> lat
theta_len = len(inp.rcomp.data)
theta_reform = 90.0 - np.reshape(inp.rcomp.data, [1, 1, theta_len])
# in the IDL code, we use reform to repeat the vector above
# here, we'll do the same thing with np.repeat
# repeated across phi
theta_rebin1 = np.repeat(theta_reform, azimuth_dim[1], axis=1)
# repeated across energy
out_theta = np.repeat(theta_rebin1, energy_len, axis=0)
return out_theta
def _get_phi(azimuth, full):
# get azimuth
# -shift from time-azi.-elev.-en. to time-en.-azi.-elev.
out_phi = azimuth.data[full, :, :, :]
if out_phi.ndim == 4:
out_phi = out_phi.transpose([0, 3, 1, 2])
elif out_phi.ndim == 3:
out_phi = out_phi.transpose([2, 0, 1])
return out_phi
def _get_dphi(azimuth, full, out_phi, inp):
energy_len = len(inp.ccomp.data)
theta_len = len(inp.rcomp.data)
phi_len = azimuth.shape[1]
# get dphi
# - use median distance between subsequent phi measurements within each
# distribution (median is used to discard large differences across 0=360)
# - preserve dimensionality in case differences arise across energy or
# elevation
if out_phi.ndim == 4:
out_dphi = np.median(np.diff(azimuth.data[full, ...], axis=1), axis=1)
out_dphi = np.transpose(out_dphi, [0, 2, 1])
dphi_reform = np.reshape(
out_dphi,
[full.size, energy_len, theta_len, 1],
)
out_dphi = np.repeat(dphi_reform, phi_len, axis=3)
elif out_phi.ndim == 3:
out_dphi = np.median(np.diff(azimuth.data[full, ...], axis=1), axis=0)
out_dphi = out_dphi.transpose([1, 0])
dphi_reform = np.reshape(out_dphi, [energy_len, theta_len, 1])
out_dphi = np.repeat(dphi_reform, phi_len, axis=2)
else:
raise TypeError("Invalid shape of out_phi")
return out_dphi
[docs]def get_hpca_dist(inp, azimuth):
r"""Returns pseudo-3D particle data structures containing mms hpca data
for use with spd_slice2d.
Parameters
----------
inp : xarray.DataArray
HPCA ion spec
azimuth : xarray.DataArray
Azimuthal angles.
Returns
-------
out : dict
HPCA distribution
"""
# check if the time series is monotonic to avoid doing incorrect
# calculations when there's a problem with the CDF files
time_data = azimuth.time.data
n_mono = np.where(time_data[1:] <= time_data[:-1])
assert len(n_mono[0]) == 0, "non-monotonic data found in the Epoch_Angles"
# find azimuth times with complete 1/2 spins of particle data this is
# used to determine the number of 3D distributions that will be created
# and where their corresponding data is located in the particle data
# structure
n_times = len(azimuth.data[0, 0, :, 0])
data_idx = np.searchsorted(inp.time.data, time_data) - 1
full = np.argwhere((data_idx[1:] - data_idx[:-1]) == n_times)
assert len(full) != 0, "Azimuth data does not cover current time range"
# filter times when azimuth data is all zero
# -just check the first energy & elevation
# -assume azimuth values are positive
n_expected = azimuth.data[full, 0, 0, :].size
n_valid_az = np.count_nonzero(azimuth.data[full, 0, 0, :])
assert n_valid_az == n_expected, "zeroes found in the azimuth array"
full = np.squeeze(full)
data_idx = data_idx[full].flatten()
# Initialize energies, angles, and support data
# final dimensions for a single distribution (energy-azimuth-elevation)
azimuth_dim = azimuth.data.shape
dim = (azimuth_dim[3], azimuth_dim[2], azimuth_dim[1])
specie = inp.attrs["FIELDNAM"].split(" ")[0].lower()
mass, charge = mass_and_charge[specie]
out_bins = np.ones(dim) + 1
out_denergy = np.zeros(dim)
energy_len = len(inp.ccomp.data)
theta_len = len(inp.rcomp.data)
phi_len = azimuth_dim[1]
# energy bins are constant
out_energy = _get_energy(inp, dim)
out_theta = _get_theta(inp, azimuth_dim)
out_dtheta = np.zeros([energy_len, theta_len, azimuth_dim[2]]) + 22.5
out_phi = _get_phi(azimuth, full)
out_dphi = _get_dphi(azimuth, full, out_phi, inp)
out_data = np.zeros((full.size, dim[0], dim[1], dim[2]))
# copy particle data
for i in range(full.size):
# need to extract the data from the center of the half-spin
if data_idx[i] - n_times / 2.0 >= 0:
start_idx = int(data_idx[i] - n_times / 2.0)
else:
continue
if data_idx[i] + n_times / 2.0 - 1 < len(inp.time):
end_idx = int(data_idx[i] + n_times / 2.0)
else:
continue
out_data[i, ...] = np.transpose(
inp.data[start_idx:end_idx, :, :],
[2, 0, 1],
)
out = {
"data": out_data,
"bins": out_bins,
"theta": out_theta,
"phi": out_phi,
"energy": out_energy,
"dtheta": out_dtheta,
"dphi": out_dphi,
"denergy": out_denergy,
"n_energy": energy_len,
"n_theta": theta_len,
"n_phi": phi_len,
"n_times": full.size,
"project_name": "MMS",
"species": specie,
"charge": charge,
"units_name": "df_cm",
"mass": mass,
}
return out