Source code for pyrfu.mms.hpca_pad

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Built-in imports
import warnings

# 3rd party imports
import numpy as np
import xarray as xr
from scipy import interpolate

# Local imports
from ..pyrf.normalize import normalize
from ..pyrf.resample import resample
from ..pyrf.time_clip import time_clip
from ..pyrf.ts_scalar import ts_scalar

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"


def _hpca_elevations():
    anode_theta = [
        123.75000,
        101.25000,
        78.750000,
        56.250000,
        33.750000,
        11.250000,
        11.250000,
        33.750000,
        56.250000,
        78.750000,
        101.25000,
        123.75000,
        146.25000,
        168.75000,
        168.75000,
        146.25000,
    ]

    anode_theta[6:14] = [anode_val + 180.0 for anode_val in anode_theta[6:14]]

    return anode_theta


[docs]def hpca_pad(vdf, saz, aze, b_xyz, elim=None): r"""Computes HPCA pitch angle distribution. Parameters ---------- vdf : xarray.DataArray Ion PSD or flux; [nt, npo16, ner63], looking direction saz : xarray.DataArray Start index of azimuthal angle; [nt, 1], (0 - 15) aze : xarray.DataArray Azimuthal angle per energy; [nT, naz16, npo16, ner63] b_xyz : xarray.DataArray B in dmpa coordinate elim : list, Optional [emin, emax], energy range for PAD Returns ------- pad_spec : xarray.DataArray PAD spectrum Examples -------- >>> pad_ = hpca_pad(vdf, saz, aze, elev, ien, b_xyz, elim=[500, 3000]) """ # 1. get data ien = vdf.ccomp elev = _hpca_elevations() if elim is None: elim = [ien.data[0], ien.data[-1]] # azimuthal angle # n_az = 16 # 2. set tint and TLIM(dist & startaz) n_start_az = np.where(saz.data == 0)[0][0] start_time_match = aze.time.data[0].view("i8") start_time_match -= vdf.time.data[n_start_az].view("i8") if start_time_match < 1e6: warnings.warn("start times of aze and vdf match.", UserWarning) else: raise ValueError("start times of aze and vdf don't match!") n_stop_az = np.where(saz.data == 15)[0][-1] if (n_stop_az - n_start_az + 1) // n_az == len(aze): warnings.warn("stop times of aze and vdf match.", UserWarning) else: raise ValueError("stop times of aze and vdf don't match!") tint_ = [saz.time.data[n_start_az], saz.time.data[n_stop_az]] tint_ = [np.datetime_as_string(time, "ns") for time in tint_] vdf = time_clip(vdf, tint_) # 3. compute PAD # 3.1. pitchangle # Default pitch angles. 15 degree angle widths angle_vec = np.linspace(15, 180, 12) d_angle = np.median(np.diff(angle_vec)) * np.ones(len(angle_vec)) pitch_a = angle_vec - d_angle / 2 # 3.2.data dimension n_po, n_en, n_ta, n_ti = len(elev), len(ien), len(aze), len(vdf) assert n_ti == n_ta * n_az, "aze and vdf don't match!" tt_ = vdf.time.data # 3.3.reshape aze to aze_mat aze_mat = np.transpose(aze.data, [1, 0, 2, 3]) # [nt, npo, ner] aze_mat = np.reshape(aze_mat, [n_ti, n_po, n_en]) elev_mat = np.tile(elev, (n_ti, n_en, 1)) # [nt, npo, ner] elev_mat = np.transpose(elev_mat, [0, 2, 1]) xx_ = np.sin(np.deg2rad(elev_mat)) * np.cos(np.deg2rad(aze_mat)) yy_ = np.sin(np.deg2rad(elev_mat)) * np.sin(np.deg2rad(aze_mat)) zz_ = np.cos(np.deg2rad(elev_mat)) t0_ = vdf.time.data.astype(np.int64) t0_start = t0_[0] t0_ -= t0_start tck_ = interpolate.interp1d( np.arange(0, n_en * len(t0_), n_en), t0_, fill_value="extrapolate", ) t1_tt = tck_(np.arange(0, n_en * len(t0_))) + t0_start t1_tt = t1_tt.astype("datetime64[ns]") b_xyz_r = resample(b_xyz, ts_scalar(t1_tt, np.zeros(len(t1_tt)))) b_xyz_r = normalize(b_xyz_r) b_x = np.transpose(np.reshape(b_xyz_r.data[:, 0], [n_en, n_ti])) b_x = np.transpose(np.tile(b_x, [n_po, 1, 1]), [1, 0, 2]) # [nt, npo, ner] b_y = np.transpose(np.reshape(b_xyz_r.data[:, 1], [n_en, n_ti])) b_y = np.transpose(np.tile(b_y, [n_po, 1, 1]), [1, 0, 2]) # [nt, npo, ner] b_z = np.transpose(np.reshape(b_xyz_r.data[:, 2], [n_en, n_ti])) b_z = np.transpose(np.tile(b_z, [n_po, 1, 1]), [1, 0, 2]) # [nt, npo, ner] theta_b = np.arccos(xx_ * b_x + yy_ * b_y + zz_ * b_z) theta_b = np.rad2deg(theta_b) # 3.5.select dist for PAD vdfs_ = [vdf.data.copy() for _ in range(len(angle_vec))] vdfs_[0][theta_b > angle_vec[0]] = np.nan for i, vdf_ in enumerate(vdfs_): vdf_[theta_b < (angle_vec[i] - d_angle[i])] = np.nan vdf_[theta_b > angle_vec[i]] = np.nan vdfs_[-1][theta_b < (angle_vec[-1] - d_angle[len(angle_vec) - 1])] = np.nan # [n_ti, n_po, n_en] --> [n_ti, n_en] vdfs_ = [np.squeeze(np.nanmean(vdf_, axis=1)) for vdf_ in vdfs_] # average among energy dimension i_elim = np.argmin(abs(ien.data - elim[0])) e_min = ien.data[i_elim] j_elim = np.argmin(abs(ien.data - elim[1])) e_max = ien.data[j_elim] messsage = f"PSD/pflux pitch angle dist. from {e_min} [eV] to {e_max} [eV]" warnings.warn(messsage, UserWarning) vdfs_ = [np.nanmean(vdf_[:, i_elim:j_elim], axis=1) for vdf_ in vdfs_] vdfs_ = [np.squeeze(vdf_) for vdf_ in vdfs_] # [n_ti, n_en] --> [n_ti] padd_ = np.transpose(np.stack(vdfs_)) # [nt, ner, npitcha12] # 3.6.make spectrum coords = [tt_, pitch_a] dims = ["time", "theta"] pad_spec = xr.DataArray(padd_, coords=coords, dims=dims) return pad_spec