#!/usr/bin/env python
# -*- coding: utf-8 -*-
import logging
# Built-in imports
import warnings
# 3rd party imports
import numpy as np
import xarray as xr
# Local imports
from ..pyrf.normalize import normalize
from ..pyrf.resample import resample
from ..pyrf.time_clip import time_clip
__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"
logging.captureWarnings(True)
logging.basicConfig(
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
level=logging.INFO,
)
[docs]def get_pitch_angle_dist(vdf, b_xyz, tint: list = None, **kwargs):
r"""Computes the pitch angle distributions from l1b brst particle data.
Parameters
----------
vdf : xarray.Dataset
to fill
b_xyz : xarray.DataArray
to fill
tint : list of str, Optional
Time interval for closeup.
Returns
-------
pad : xarray.DataArray
Particle pitch angle distribution
Other Parameters
----------------
angles : int or float or list of ndarray
User defined angles.
meanorsum : {'mean', 'sum', 'sum_weighted'}
Method.
Examples
--------
>>> from pyrfu import mms
Define time intervals
>>> tint_long = ["2017-07-24T12:48:34.000", "2017-07-24T12:58:20.000"]
>>> tint_zoom = ["2017-07-24T12:49:18.000", "2017-07-24T12:49:30.000"]
Load ions velocity distribution for MMS1
>>> vdf_i = mms.get_data("pdi_fpi_brst_l2", tint_long, 1)
Load magnetic field in the spacecraft coordinates system.
>>> b_dmpa = mms.get_data("b_dmpa_fgm_brst_l2", tint_long, 1)
Compute pitch angle distribution
>>> options = dict(angles=24)
>>> pad_i = mms.get_pitch_angle_dist(vdf, b_dmpa, tint_zoom, **options)
"""
# Default pitch angles. 15 degree angle widths
angles_v = np.linspace(15, 180, int(180 / 15))
d_angles = np.median(np.diff(angles_v)) * np.ones(len(angles_v))
if "angles" in kwargs:
if isinstance(kwargs["angles"], (int, float)):
n_angles = int(kwargs["angles"]) # Make sure input is integer
d_angles = 180 / n_angles
angles_v = np.linspace(d_angles, 180, n_angles)
d_angles = d_angles * np.ones(n_angles)
logging.info("User defined number of pitch angles.")
elif isinstance(kwargs["angles"], (list, np.ndarray)):
angles_v = kwargs["angles"]
d_angles = np.diff(angles_v)
angles_v = angles_v[1:]
logging.info("User defined pitch angle limits.")
else:
raise ValueError("angles parameter not understood.")
# Method
mean_or_sum = kwargs.get("meanorsum", "mean")
assert mean_or_sum in ["mean", "sum"], "meanorsum param. not understood."
pitch_angles = angles_v - d_angles / 2
n_angles = len(angles_v)
time = vdf.time.data
vdf0 = vdf.data.copy()
if vdf.phi.data.ndim == 1:
phi = np.tile(vdf.phi.data, (len(time), 1))
phi = xr.DataArray(
phi,
coords=[time, np.arange(len(phi))],
dims=["time", "idx1"],
)
else:
phi = vdf.phi
theta = vdf.theta
if "energy0" in vdf.attrs.keys() and "energy1" in vdf.attrs.keys():
energy0, _ = [vdf.attrs[f"energy{i}"] for i in range(2)]
else:
energy0, _ = vdf.energy.data[:2, :]
if tint is not None:
b_xyz = time_clip(b_xyz, tint)
vdf0 = time_clip(vdf0, tint)
phi = time_clip(phi, tint)
time = vdf0.time.data
# Check size of energy
n_en, n_phi, n_theta = [len(energy0), len(phi.data[0, :]), len(theta)]
b_xyz = resample(b_xyz, vdf0)
b_vec = normalize(b_xyz)
b_vec_x = np.transpose(
np.tile(b_vec.data[:, 0], [n_en, n_phi, n_theta, 1]),
[3, 0, 1, 2],
)
b_vec_y = np.transpose(
np.tile(b_vec.data[:, 1], [n_en, n_phi, n_theta, 1]),
[3, 0, 1, 2],
)
b_vec_z = np.transpose(
np.tile(b_vec.data[:, 2], [n_en, n_phi, n_theta, 1]),
[3, 0, 1, 2],
)
x_vec = np.zeros((len(time), n_phi, n_theta))
y_vec = np.zeros((len(time), n_phi, n_theta))
z_vec = np.zeros((len(time), n_phi, n_theta))
for i in range(len(time)):
x_vec[i, ...] = np.dot(
-np.cos(np.deg2rad(phi.data[i, None])).T,
np.sin(np.deg2rad(theta.data[:, None])).T,
)
y_vec[i, ...] = np.dot(
-np.sin(np.deg2rad(phi.data[i, None])).T,
np.sin(np.deg2rad(theta.data[:, None])).T,
)
z_vec[i, ...] = np.dot(
-np.ones((n_phi, 1)),
np.cos(np.deg2rad(theta.data[:, None])).T,
)
if tint is not None:
energy = time_clip(vdf.energy, tint).data
else:
energy = vdf.energy.data
x_mat = np.squeeze(
np.transpose(np.tile(x_vec, [n_en, 1, 1, 1]), [1, 0, 2, 3]),
)
y_mat = np.squeeze(
np.transpose(np.tile(y_vec, [n_en, 1, 1, 1]), [1, 0, 2, 3]),
)
z_mat = np.squeeze(
np.transpose(np.tile(z_vec, [n_en, 1, 1, 1]), [1, 0, 2, 3]),
)
theta_b = np.rad2deg(
np.arccos(
x_mat * np.squeeze(b_vec_x)
+ y_mat * np.squeeze(b_vec_y)
+ z_mat * np.squeeze(b_vec_z),
),
)
dists = [vdf0.data.copy() for _ in range(n_angles)]
pad_arr = [None] * n_angles
for i in range(n_angles):
dists[i][theta_b < angles_v[i] - d_angles[i]] = np.nan
dists[i][theta_b > angles_v[i]] = np.nan
with warnings.catch_warnings():
warnings.simplefilter("ignore", category=RuntimeWarning)
if mean_or_sum == "mean":
pad_arr[i] = np.squeeze(
np.nanmean(np.nanmean(dists[i], axis=3), axis=2),
)
elif mean_or_sum == "sum":
pad_arr[i] = np.squeeze(
np.nansum(np.nansum(dists[i], axis=3), axis=2),
)
else:
raise ValueError("Invalid method")
pad_arr = np.transpose(np.stack(pad_arr), [1, 0, 2])
energy = np.mean(energy[:2, :], axis=0)
pad = xr.Dataset(
{
"data": (
["time", "idx0", "idx1"],
np.transpose(pad_arr, [0, 2, 1]),
),
"energy": (["time", "idx0"], np.tile(energy, (len(pad_arr), 1))),
"theta": (
["time", "idx1"],
np.tile(pitch_angles, (len(pad_arr), 1)),
),
"time": time,
"idx0": np.arange(len(energy)),
"idx1": np.arange(len(pitch_angles)),
},
)
pad.attrs = vdf.attrs
pad.attrs["mean_or_sum"] = mean_or_sum
pad.attrs["delta_pitchangle_minus"] = d_angles * 0.5
pad.attrs["delta_pitchangle_plus"] = d_angles * 0.5
pad.time.attrs = vdf.time.attrs
pad.energy.attrs = vdf.energy.attrs
pad.data.attrs["UNITS"] = vdf.data.attrs["UNITS"]
return pad