Source code for pyrfu.mms.psd_rebin

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Built-in imports
from typing import Tuple, Union

# 3rd party
import numpy as np
from numpy.typing import NDArray
from xarray.core.dataset import Dataset

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2024"
__license__ = "MIT"
__version__ = "2.4.13"
__status__ = "Prototype"


[docs]def psd_rebin( vdf: Dataset, phi: NDArray[np.float32], energy0: NDArray[np.float32], energy1: NDArray[np.float32], esteptable: NDArray[np.uint8], ) -> Tuple[ NDArray[np.datetime64], NDArray[Union[np.float32, np.float64]], NDArray[np.float32], NDArray[np.float32], ]: r"""Convert burst mode distribution into 64 energy channel distribution. Takes the burst mode distribution sampled in two energy tables and converts to a single energy table with 64 energy channels. Time resolution is halved and phi angles are averaged over adjacent times. Parameters ---------- vdf : Dataset Time series of the particle distribution. phi : numpy.ndarray Time series of the phi angles. energy0 : numpy.ndarray Energy table 0. energy1 : numpy.ndarray Energy table 1. esteptable : numpy.ndarray Time series of the stepping table between energies (burst). Returns ------- tuple * time_r : numpy.ndarray Time array of the recompiled distribution. * vdf_r : numpy.ndarray Recompiled distribution. * energy_r : numpy.ndarray Recompiled energy table. * phi_r : numpy.ndarray Recompiled phi angles. Raises ------ TypeError If vdf is not a xarray.Dataset or if phi is not a numpy.ndarray or if energy0 is not a numpy.ndarray or if energy1 is not a numpy.ndarray or if esteptable is not a numpy.ndarray. Notes ----- I'm assuming no gaps in the burst data interval. If there is a gap use time_clip before running. To be updated later. """ if not isinstance(vdf, Dataset): raise TypeError("vdf must be a xarray.Dataset") if not isinstance(phi, np.ndarray): raise TypeError("phi must be a numpy.ndarray") if not isinstance(energy0, np.ndarray): raise TypeError("energy0 must be a numpy.ndarray") if not isinstance(energy1, np.ndarray): raise TypeError("energy1 must be a numpy.ndarray") if not isinstance(esteptable, np.ndarray): raise TypeError("step_table must be a numpy.ndarray") # Get time and data vdf_time: NDArray[np.datetime64] = vdf.time.data vdf_data: NDArray[np.float64] = vdf.data.data.astype(np.float64) # Sort energy levels energy_r: NDArray[np.float32] = np.sort(np.hstack([energy0, energy1])) # Define new times delta_t: float = np.median(np.diff(vdf_time)).astype(np.int16) / 1e9 time_r: NDArray[np.datetime64] = vdf_time[:-1:2] + np.timedelta64( int(delta_t * 1e9 / 2), "ns" ) # Preallocate output arrays vdf_r: NDArray[np.float64] = np.zeros( (len(time_r), 64, *vdf.data.shape[2:]), dtype=np.float64 ) phi_r: NDArray[np.float32] = np.zeros( (len(time_r), vdf.data.shape[2]), dtype=np.float32 ) phi_s: NDArray[np.float32] = np.roll(phi, 2, axis=1) phi_s[:, 0] = phi_s[:, 0] - 360.0 time_indices: NDArray[np.int16] = np.arange(0, len(vdf.time) - 1, 2, dtype=np.int16) for new_el_num, idx in enumerate(time_indices[:-1]): if phi[idx, 0] > phi[idx + 1, 0]: phi_r[new_el_num, :] = (phi[idx, :] + phi_s[idx + 1, :]) / 2 vdf_temp: NDArray[np.float32] = np.roll( np.squeeze(vdf_data[idx + 1, ...]), 2, axis=1, ) if esteptable[idx]: vdf_r[new_el_num, 1:64:2, ...] = vdf_data[idx, ...] vdf_r[new_el_num, 0:63:2, ...] = vdf_temp else: vdf_r[new_el_num, 1:64:2, ...] = vdf_data[idx, ...] vdf_r[new_el_num, 0:63:2, ...] = vdf_temp else: phi_r[new_el_num, :] = phi[idx, :] + phi[idx + 1, :] phi_r[new_el_num, :] /= 2 if esteptable[idx]: vdf_r[new_el_num, 1:64:2, ...] = vdf_data[idx, ...] vdf_r[new_el_num, 0:63:2, ...] = vdf.data.data[idx + 1, ...] else: vdf_r[new_el_num, 1:64:2, ...] = vdf_data[idx + 1, ...] vdf_r[new_el_num, 0:63:2, ...] = vdf_data[idx, ...] out = (time_r, vdf_r.astype(vdf.data.data.dtype), energy_r, phi_r) return out