Source code for pyrfu.pyrf.ebsp

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Built-in imports
import logging
import os
import warnings

# 3rd party imports
import numba
import numpy as np
import xarray as xr
from scipy import fft

from .calc_fs import calc_fs
from .cart2sph import cart2sph
from .convert_fac import convert_fac
from .resample import resample

# Local imports
from .ts_time import ts_time
from .ts_vec_xyz import ts_vec_xyz
from .unix2datetime64 import unix2datetime64

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"

logging.captureWarnings(True)
logging.basicConfig(
    format="[%(asctime)s] %(levelname)s: %(message)s",
    datefmt="%d-%b-%y %H:%M:%S",
    level=logging.INFO,
)


def _checksampling(e_xyz, db_xyz, b_xyz, b_bgd, flag_no_resamp):
    assert e_xyz is not None

    fs_e, fs_b = [calc_fs(e_xyz), calc_fs(db_xyz)]

    resample_b_options = {"f_s": fs_b}

    if flag_no_resamp:
        assert fs_e == fs_b
        fs_ = fs_e
    else:
        if fs_b > 1.5 * fs_e:
            e_xyz = resample(e_xyz, db_xyz, **resample_b_options)
            b_bgd = resample(b_bgd, db_xyz, **resample_b_options)

            fs_ = fs_b
            logging.info("Interpolating e to b")
        elif fs_e > 1.5 * fs_b:
            db_xyz = resample(db_xyz, e_xyz)
            b_bgd = resample(b_bgd, e_xyz)

            fs_ = fs_e
            logging.info("Interpolating b to e")
        elif fs_e == fs_b and len(e_xyz) == len(db_xyz):
            fs_ = fs_e
        else:
            fs_ = 2 * fs_e
            start_time = np.max(
                [
                    e_xyz.time.data[0].astype(np.float64) / 1e9,
                    db_xyz.time.data[0].astype(np.float64) / 1e9,
                ]
            )
            end_time = np.min(
                [
                    e_xyz.time.data[-1].astype(np.float64) / 1e9,
                    db_xyz.time.data[-1].astype(np.float64) / 1e9,
                ]
            )

            nt = np.floor((end_time - start_time) * fs_).astype(np.int64)

            t = np.linspace(start_time, end_time, nt)

            t = ts_time(t)

            e_xyz = resample(e_xyz, t)
            b_bgd = resample(b_bgd, t)
            b_xyz = resample(b_xyz, t)
            db_xyz = resample(db_xyz, t)

            logging.info("Interpolating b and e to 2x e sampling")

    return e_xyz, db_xyz, b_xyz, b_bgd, fs_


def _b_elevation(b_x, b_y, b_z, angle_b_elevation_max):
    # Remove the last sample if the total number of samples is odd
    b_x = b_x[: int(2 * (len(b_x) // 2))]
    b_y = b_y[: int(2 * (len(b_y) // 2))]
    b_z = b_z[: int(2 * (len(b_z) // 2))]

    angle_b_elevation = np.arctan(b_z / np.sqrt(b_x**2 + b_y**2))
    angle_b_elevation = np.rad2deg(angle_b_elevation)
    idx_b_par_spin_plane = np.abs(angle_b_elevation) < angle_b_elevation_max

    return angle_b_elevation, idx_b_par_spin_plane


def _freq_int(freq_int, delta_b):
    start_time = delta_b.time.data[0].astype(np.float64) / 1e9
    end_time = delta_b.time.data[-1].astype(np.float64) / 1e9

    pc12_range, other_range = [False, False]

    if isinstance(freq_int, str):
        if freq_int.lower() == "pc35":
            freq_int = [0.002, 0.1]

            delta_t = 60  # local
        else:
            pc12_range = True

            freq_int = [0.1, 5.0]

            delta_t = 1  # local

        fs_out = 1 / delta_t
    else:
        if freq_int[1] >= freq_int[0]:
            other_range = True

            fs_out = freq_int[1] / 5

            delta_t = 1 / fs_out  # local
        else:
            raise ValueError("FREQ_INT must be [f_min f_max], f_min<f_max")

    nt = np.floor((end_time - start_time) / delta_t).astype(np.int64)

    out_time = np.linspace(start_time, end_time, nt, dtype=np.float64)
    out_time += delta_t / 2.0
    out_time = out_time[:-1]

    any_range = [pc12_range, other_range]

    return any_range, freq_int, fs_out, out_time


@numba.jit(cache=True, nogil=True, parallel=True, nopython=True, fastmath=True)
def _average_data(data=None, x=None, y=None, av_window=None):
    # average data with time x to time y using window

    dtx, dty = [np.median(np.diff(x)), np.median(np.diff(y))]

    if av_window is None:
        av_window = dty

    dt2 = av_window / 2

    n_data_out = len(y)

    # Pad data with NaNs from each side
    n_point_to_add = int(np.ceil(dt2 / dtx))
    pad_nan = np.ones((n_point_to_add, data.shape[1]), dtype="complex128") * np.nan
    data_padded = np.vstack((pad_nan, data, pad_nan))

    x_pad_pref = np.linspace(x[0] - dtx * (n_point_to_add - 1), x[0], n_point_to_add)
    x_pad_suff = np.linspace(x[-1], x[-1] + dtx * (n_point_to_add - 1), n_point_to_add)
    x_padded = np.hstack((x_pad_pref, x, x_pad_suff))

    out = np.zeros((n_data_out, data.shape[1]), dtype="complex128")

    il = np.digitize(y - dt2, x_padded)
    ir = np.digitize(y + dt2, x_padded)

    for i in numba.prange(len(y)):
        for j in range(data.shape[1]):
            out[i, j] = np.nanmean(data_padded[il[i] : ir[i], j])

    return out


def _bb_xxyyzzss(power_bx_plot, power_by_plot, power_bz_plot, power_2b_plot):
    bb_xxyyzzss = np.tile(power_bx_plot[:, :, np.newaxis], (1, 1, 4))
    bb_xxyyzzss[:, :, 1] = power_by_plot
    bb_xxyyzzss[:, :, 2] = power_bz_plot
    bb_xxyyzzss[:, :, 3] = power_2b_plot
    return np.real(bb_xxyyzzss)


def _ee_xxyyzzss(power_ex_plot, power_ey_plot, power_ez_plot, power_2e_plot):
    ee_xxyyzzss = np.tile(power_ex_plot[..., np.newaxis], (1, 1, 4))
    ee_xxyyzzss[:, :, 1] = power_ey_plot
    ee_xxyyzzss[:, :, 2] = power_ez_plot
    ee_xxyyzzss[:, :, 3] = power_2e_plot
    return np.real(ee_xxyyzzss)


@numba.jit(cache=True, nogil=True, parallel=True, nopython=True, fastmath=True)
def _censure_plot(inp, idx_nan, censure, n_data, a_):
    out = inp.copy()

    for i in numba.prange(len(idx_nan) - 1):
        for j in range(len(a_)):
            if idx_nan[i] < idx_nan[i + 1]:
                out[int(max([i - censure[j], 0])) : i, j] = np.nan

            if idx_nan[i] > idx_nan[i + 1]:
                out[i : int(min([i + censure[j], n_data])), j] = np.nan

    return out


[docs]def ebsp(e_xyz, db_xyz, b_xyz, b_bgd, xyz, freq_int, **kwargs): """Calculates wavelet spectra of E&B and Poynting flux using wavelets (Morlet wavelet). Also computes polarization parameters of B using SVD [7]_. SVD is performed on spectral matrices computed from the time series of B using wavelets and then averaged over a number of wave periods. Parameters ---------- e_xyz : xarray.DataArray Time series of the wave electric field. db_xyz : xarray.DataArray Time series of the wave magnetic field. b_xyz : xarray.DataArray Time series of the high resolution background magnetic field used for E.B=0. b_bgd : xarray.DataArray Time series of the background magnetic field used for field aligned coordinates. xyz : xarray.DataArray Time series of the position time series of spacecraft used for field aligned coordinates. freq_int : str or array_like Frequency interval : * "pc12" : [0.1, 5.0] * "pc35" : [2e-3, 0.1] * [fmin, fmax] : arbitrary interval [fmin,fmax] Returns ------- res : xarray.Dataset Dataset with : * t : xarray.DataArray Time. * f : xarray.DataArray Frequencies. * bb_xxyyzzss : xarray.DataArray delta_b power spectrum with : * [...,0] : x * [...,1] : y * [...,2] : z * [...,3] : sum * ee_xxyyzzss : xarray.DataArray E power spectrum with : * [...,0] : x * [...,1] : y * [...,2] : z * [...,3] : sum * ee_ss : xarray.DataArray E power spectrum (xx+yy spacecraft coordinates, e.g. ISR2). * pf_xyz : xarray.DataArray Poynting flux (xyz). * pf_rtp : xarray.DataArray Poynting flux (r, theta, phi) [angles in degrees]. * dop : xarray.DataArray 3D degree of polarization. * dop2d : xarray.DataArray 2D degree of polarization in the polarization plane. * planarity : xarray.DataArray Planarity of polarization. * ellipticity : xarray.DataArray Ellipticity of polarization ellipse. * k : xarray.DataArray k-vector (theta, phi FAC) [angles in degrees]. Other Parameters ---------------- polarization : bool Computes polarization parameters. Default False. no_resample : bool No resampling, E and delta_b are given at the same time line. Default False. fac : bool Uses FAC coordinate system (defined by b0 and optionally xyz), otherwise no coordinate system transformation is performed. Default True. de_dot_b0 : bool Computes dEz from delta_b dot B = 0, uses full_b. Default False. full_b_db : bool delta_b contains DC field. Default False. nav : int Number of wave periods to average Default 8. fac_matrix : numpy.ndarray Specify rotation matrix to FAC system Default None. m_width_coeff : int or float Specify coefficient to multiple Morlet wavelet width by. Default 1. See also -------- pyrfu.plot.pl_ebsp : to fill. pyrfu.pyrf.convert_fac : Transforms to a field-aligned coordinate. Notes ----- This software was developed as part of the MAARBLE (Monitoring, Analyzing and Assessing Radiation Belt Energization and Loss) collaborative research project which has received funding from the European Community's Seventh Framework Programme (FP7-SPACE-2011-1) under grant agreement n. 284520. References ---------- .. [7] SantolĂ­k, O., Parrot. M., and Lefeuvre. F. (2003) Singular value decomposition methods for wave propagation analysis,Radio Sci., 38(1), 1010, doi : https://doi.org/10.1029/2000RS002523 . Examples -------- >>> from pyrfu import mms, pyrf >>> # Time interval >>> tint_brst = ["2015-10-30T05:15:42.000", "2015-10-30T05:15:54.000"] >>> # Spacecraft index >>> mms_id = 3 >>> # Load spacecraft position >>> tint_long = pyrf.extend_tint(tint_brst, [-100, 100]) >>> r_xyz = mms.get_data("R_gse", tint_long, mms_id) >>> # Load background magnetic field, electric field and magnetic field fluctuations >>> b_xyz = mms.get_data("B_gse_fgm_brst_l2", tint_brst, mms_id) >>> e_xyz = mms.get_data("E_gse_edp_brst_l2", tint_brst, mms_id) >>> b_scm = mms.get_data("B_gse_scm_brst_l2", tint_brst, mms_id) >>> # Polarization analysis >>> options = dict(polarization=True, fac=True) >>> polarization = pyrf.ebsp(e_xyz, b_scm, b_xyz, b_xyz, r_xyz, >>> freq_int=[10, 4000], **options) """ assert isinstance(db_xyz, xr.DataArray), "delta_b must be a DataArray" assert isinstance(b_xyz, xr.DataArray), "full_b must be a DataArray" assert isinstance(b_bgd, xr.DataArray), "b0 must be a DataArray" message = "freq_int must be a string or array_like" assert isinstance(freq_int, (list, np.ndarray, str)), message if isinstance(freq_int, (list, np.ndarray)): assert len(freq_int) == 2, "freq_int list must contain two elements" else: assert freq_int in ["pc12", "pc35"], "string freq_int must be pc12 or pc35" # Compute magnetic field fluctuations sampling frequency fs_b = calc_fs(db_xyz) # Below which we cannot apply E*B=0 angle_b_elevation_max = 15.0 want_ee = e_xyz is not None res = { "t": None, "f": None, "flagFac": 0, "bb_xxyyzzss": None, "ee_xxyyzzss": None, "ee_ss": None, "pf_xyz": None, "pf_rtp": None, "dop": None, "dop2d": None, "planarity": None, "ellipticity": None, "k_tp": None, "full_b": b_xyz, "b0": b_bgd, "r": xyz, } want_polarization = kwargs.get("polarization", False) flag_no_resample = kwargs.get("no_resample", False) flag_want_fac = kwargs.get("fac", True) flag_de_dot_b0 = kwargs.get("de_dot_b0", False) flag_full_b_db = kwargs.get("full_b_db", False) m_width_coeff = kwargs.get("m_width_coeff", 1.0) # Number of wave periods to average n_wave_period_to_average = kwargs.get("nav", 8) # matrix for rotation to FAC fac_matrix = kwargs.get("fac_matrix", None) if flag_want_fac and fac_matrix is None: if xyz is None: logging.info( "convert_fac : assuming s/c position [1 0 0] for estimating FAC" ) xyz = [1, 0, 0] xyz = ts_vec_xyz(db_xyz.time.data, np.tile(xyz, (len(db_xyz), 1))) else: assert isinstance(xyz, xr.DataArray), "xyz must be a DataArray" xyz = resample(xyz, db_xyz, **{"f_s": fs_b}) b_bgd = resample(b_bgd, db_xyz, **{"f_s": fs_b}) if flag_full_b_db: b_xyz = db_xyz res["full_b"] = b_xyz db_xyz = db_xyz - b_bgd any_range, freq_int, out_sampling, out_time = _freq_int(freq_int, db_xyz) pc12_range, other_range = any_range if want_ee: # Check the sampling rate temp_ = _checksampling(e_xyz, db_xyz, b_xyz, b_bgd, flag_no_resample) e_xyz, db_xyz, b_xyz, b_bgd, in_sampling = temp_ else: in_sampling = calc_fs(db_xyz) e_xyz = None if in_sampling / 2 < freq_int[1]: raise ValueError("F_MAX must be lower than the Nyquist frequency") if want_ee and e_xyz.shape[1] < 3 and not flag_de_dot_b0: raise TypeError( "E must have all 3 components or flag de_dot_db=0 must be given" ) if len(db_xyz) % 2: db_xyz = db_xyz[:-1, :] b_bgd = b_bgd[:-1, :] if fac_matrix is None: xyz = xyz[:-1, :] else: fac_matrix = fac_matrix[:-1, ...] if want_ee: e_xyz = e_xyz[:-1, :] in_time = db_xyz.time.data.astype(np.float64) / 1e9 b_x, b_y, b_z = [None, None, None] idx_b_par_spin_plane = None if flag_de_dot_b0: b_x, b_y, b_z = [b_xyz[:, i].data for i in range(3)] # Remove the last sample if the total number of samples is odd # temp_ = _b_elevation(b_x, b_y, b_z, angle_b_elevation_max) # angle_b_elevation, idx_b_par_spin_plane = temp_ _, idx_b_par_spin_plane = _b_elevation(b_x, b_y, b_z, angle_b_elevation_max) # If E has all three components, transform E and B waveforms to a magnetic # field aligned coordinate (FAC) and save eisr for computation of e_sum. # Otherwise we compute Ez within the main loop and do the transformation # to FAC there. time_b0 = 0 if flag_want_fac: res["flagFac"] = True time_b0 = b_bgd.time.data.astype(np.float64) / 1e9 if want_ee and not flag_de_dot_b0: eisr2 = e_xyz[:, :2] idx_nan_e = np.isnan(e_xyz.data) idx_nan_eisr2 = np.isnan(eisr2.data) if fac_matrix is None: e_xyz = convert_fac(e_xyz, b_bgd, xyz) else: e_xyz = convert_fac(e_xyz, fac_matrix) else: idx_nan_e = np.full((len(in_time), 3), False) eisr2 = None idx_nan_eisr2 = np.full((len(in_time), 2), False) if fac_matrix is None: db_xyz = convert_fac(db_xyz, b_bgd, xyz) else: db_xyz = convert_fac(db_xyz, fac_matrix) else: idx_nan_e = np.full((len(in_time), 3), False) eisr2 = None idx_nan_eisr2 = np.full((len(in_time), 2), False) # Find the frequencies for an FFT of all data and set important parameters nd2 = len(in_time) / 2 freq = in_sampling * np.arange(nd2) / nd2 * 0.5 # The frequencies corresponding to FFT w_ = np.hstack([0, freq, -np.flip(freq[:-1])]) morlet_width = 5.36 * m_width_coeff # to get proper overlap for Morlet freq_number = np.ceil( (np.log10(freq_int[1]) - np.log10(freq_int[0])) * 12 * m_width_coeff ) a_min, a_max = [ np.log10(0.5 * in_sampling / freq_int[1]), np.log10(0.5 * in_sampling / freq_int[0]), ] a_number = freq_number a_ = np.logspace(a_min, a_max, int(a_number)) # Maximum frequency w_0 = in_sampling / 2 # Width of the Morlet wavelet sigma = morlet_width / w_0 # Make the FFT of all data idx_nan_b = np.isnan(db_xyz.data) db_xyz.data[idx_nan_b] = 0 swb = fft.fft(db_xyz.data, axis=0, workers=os.cpu_count()) sw_e, sw_eisr2 = [None, None] if want_ee: logging.info("ebsp ... calculate E and B wavelet transform ... ") e_xyz.data[idx_nan_e] = 0.0 sw_e = fft.fft(e_xyz.data, axis=0, workers=os.cpu_count()) if flag_want_fac and not flag_de_dot_b0: eisr2.data[idx_nan_eisr2] = 0.0 sw_eisr2 = fft.fft(eisr2.data, axis=0, workers=os.cpu_count()) else: logging.info("ebsp ... calculate B wavelet transform ....") # Loop through all frequencies n_data, n_freq, n_data_out = [len(in_time), len(a_), len(out_time)] # power_ex_plot = np.zeros((n_data, n_freq), dtype="complex128") power_ey_plot = np.zeros((n_data, n_freq), dtype="complex128") power_ez_plot = np.zeros((n_data, n_freq), dtype="complex128") power_2e_plot = np.zeros((n_data, n_freq), dtype="complex128") power_2e_isr2_plot = np.zeros((n_data, n_freq), dtype="complex128") power_bx_plot = np.zeros((n_data, n_freq), dtype="complex128") power_by_plot = np.zeros((n_data, n_freq), dtype="complex128") power_bz_plot = np.zeros((n_data, n_freq), dtype="complex128") power_2b_plot = np.zeros((n_data, n_freq), dtype="complex128") s_plot_x = np.zeros((n_data, n_freq)) s_plot_y = np.zeros((n_data, n_freq)) s_plot_z = np.zeros((n_data, n_freq)) planarity, ellipticity = [np.zeros((n_data_out, n_freq)) for _ in range(2)] dop_3d = np.zeros((n_data_out, n_freq), dtype="complex128") dop_2d = np.zeros((n_data_out, n_freq), dtype="complex128") the_svd_fac = np.zeros((n_data_out, n_freq)) phi_svd_fac = np.zeros((n_data_out, n_freq)) # Get the correct frequencies for the wavelet transform frequency_vec = w_0 / a_ censure = np.floor(2 * a_ * out_sampling / in_sampling * n_wave_period_to_average) for ind_a, a_0 in enumerate(a_): new_freq_mat = w_0 / a_0 # resample to 1 second sampling for Pc1-2 or 1 minute sampling for # Pc3-5 average top frequencies to 1 second/1 minute below will be # an average over 8 wave periods. first find where one sample is less # than eight wave periods if frequency_vec[ind_a] / n_wave_period_to_average > out_sampling: av_window = 1 / out_sampling else: av_window = n_wave_period_to_average / frequency_vec[ind_a] # Get the wavelet transform by backward FFT w_exp_mat = np.exp(-sigma * sigma * ((a_0 * w_ - w_0) ** 2) / 2) w_exp_mat2 = np.tile(w_exp_mat[:, np.newaxis], (1, 2)) w_exp_mat = np.tile(w_exp_mat[:, np.newaxis], (1, 3)) wb = fft.ifft(np.sqrt(1) * swb * w_exp_mat, axis=0, workers=os.cpu_count()) wb = np.array(wb) # Make sure it's an array (scipy.fft.ifft returns Any type) wb[idx_nan_b] = np.nan we, w_eisr2 = [None, None] if want_ee: we = fft.ifft(np.sqrt(1) * sw_e * w_exp_mat, axis=0, workers=os.cpu_count()) we = np.array(we) we[idx_nan_e] = np.nan if flag_want_fac and not flag_de_dot_b0: w_eisr2 = fft.ifft( np.sqrt(1) * sw_eisr2 * w_exp_mat2, axis=0, workers=os.cpu_count() ) w_eisr2 = np.array(w_eisr2) w_eisr2[idx_nan_eisr2] = np.nan # Power spectrum of E, power = (2*pi)*conj(W).*W./new_freq_mat power_2e_isr2_plot[:, ind_a] = np.sum( 2 * np.pi * (w_eisr2 * np.conj(w_eisr2)) / new_freq_mat, axis=1 ) else: # Power spectrum of E, power = (2*pi)*conj(W).*W./new_freq_mat power_2e_isr2_plot[:, ind_a] = np.sum( 2 * np.pi * (we * np.conj(we)) / new_freq_mat, axis=1 ) # Compute Ez from dE * B = 0 if flag_de_dot_b0: we_re, we_im = [np.real(we), np.imag(we)] we_z = ( -(we_re[:, 0] * b_x + we_re[:, 1] * b_y) / b_z - 1j * (we_im[:, 0] * b_x + we_im[:, 1] * b_y) / b_z ) we_z[idx_b_par_spin_plane] = np.nan if flag_want_fac: if fac_matrix is None: tmp = np.vstack([np.transpose(we[:, :2]), np.transpose(we_z)]) arg_ = ts_vec_xyz(time_b0, np.transpose(tmp)) we = convert_fac(arg_, b_bgd, xyz) else: tmp = np.vstack([np.transpose(we[:, :2]), np.transpose(we_z)]) arg_ = ts_vec_xyz(time_b0, np.transpose(tmp)) we = convert_fac(arg_, fac_matrix) else: we = np.transpose( np.vstack([np.transpose(we[:, :2]), np.transpose(we_z)]) ) power_e = 2 * np.pi * (we * np.conj(we)) / new_freq_mat power_e = np.vstack([power_e.T, np.sum(power_e, axis=1)]).T power_ex_plot[:, ind_a] = power_e[:, 0] power_ey_plot[:, ind_a] = power_e[:, 1] power_ez_plot[:, ind_a] = power_e[:, 2] power_2e_plot[:, ind_a] = power_e[:, 3] # Poynting flux calculations, assume E and b units mV/m and nT, # get S in uW/m^2 4pi from wavelets, see A. Tjulins power # estimates coeff_poynting = 10 / 4 / np.pi * (1 / 4) * (4 * np.pi) s = np.zeros((n_data, 3)) we_x, we_y, we_z = [we[:, i] for i in range(3)] wb_x, wb_y, wb_z = [wb[:, i] for i in range(3)] s[:, 0] = ( coeff_poynting * np.real( we_y * np.conj(wb_z) + np.conj(we_y) * wb_z - we_z * np.conj(wb_y) - np.conj(we_z) * wb_y ) / new_freq_mat ) s[:, 1] = ( coeff_poynting * np.real( we_z * np.conj(wb_x) + np.conj(we_z) * wb_x - we_x * np.conj(wb_z) - np.conj(we_x) * wb_z ) / new_freq_mat ) s[:, 2] = ( coeff_poynting * np.real( we_x * np.conj(wb_y) + np.conj(we_x) * wb_y - we_y * np.conj(wb_x) - np.conj(we_y) * wb_x ) / new_freq_mat ) s_plot_x[:, ind_a] = s[:, 0] s_plot_y[:, ind_a] = s[:, 1] s_plot_z[:, ind_a] = s[:, 2] # Power spectrum of B power_b = 2 * np.pi * (wb * np.conj(wb)) / new_freq_mat power_b = np.vstack([power_b.T, np.sum(power_b, axis=1)]).T power_bx_plot[:, ind_a] = power_b[:, 0] power_by_plot[:, ind_a] = power_b[:, 1] power_bz_plot[:, ind_a] = power_b[:, 2] power_2b_plot[:, ind_a] = power_b[:, 3] # Polarization parameters if want_polarization: # Construct spectral matrix and average it s_mat = np.zeros((n_data, 3, 3), dtype="complex128") for i in range(3): for j in range(3): s_mat[:, i, j] = ( 2 * np.pi * (wb[:, i] * np.conj(wb[:, j])) / new_freq_mat ) # Averaged s_mat s_mat_avg = np.zeros((n_data_out, 3, 3), dtype="complex128") for comp in range(3): s_mat_avg[..., comp] = _average_data( s_mat[..., comp], in_time, out_time, av_window ) # Remove data possibly influenced by edge effects censure_idx = np.hstack( [ np.arange(np.min([censure[ind_a], len(out_time)])), np.arange( np.max([0, len(out_time) - censure[ind_a] - 1]), len(out_time) ), ] ) censure_idx = censure_idx.astype(np.int64) s_mat_avg[censure_idx, ...] = np.nan # compute singular value decomposition # real matrix which is superposition of real part of spectral # matrix over imaginary part a_mat, u_mat = [np.zeros((6, 3, n_data_out)) for _ in range(2)] w_mat, v_mat = [np.zeros((3, 3, n_data_out)) for _ in range(2)] # wSingularValues = zeros(3,n_data2); # R = zeros(3,3,n_data2); #spectral matrix in coordinate defined # by V axes a_mat[:3, ...] = np.real(np.transpose(s_mat_avg, [1, 2, 0])) a_mat[3:6, ...] = -np.imag(np.transpose(s_mat_avg, [1, 2, 0])) for i in range(n_data_out): if np.sum(np.isnan(a_mat[..., i])) != 0: u_mat[..., i] = np.nan w_mat[..., i] = np.nan v_mat[..., i] = np.nan else: uu_, ww_, vv_ = np.linalg.svd(a_mat[..., i], full_matrices=False) u_mat[..., i] = uu_ w_mat[..., i] = ww_ v_mat[..., i] = vv_ # compute direction of propagation sign_kz = np.sign(v_mat[2, 2, :]) v_mat[2, 2, :] = v_mat[2, 2, :] * sign_kz v_mat[1, 2, :] = v_mat[1, 2, :] * sign_kz v_mat[0, 2, :] = v_mat[0, 2, :] * sign_kz the_svd_fac[:, ind_a] = np.abs( np.squeeze( np.arctan( np.sqrt(v_mat[0, 2, :] ** 2 + v_mat[1, 2, :] ** 2) / v_mat[2, 2, :] ) ) ) phi_svd_fac[:, ind_a] = np.squeeze( np.arctan2(v_mat[1, 2, :], v_mat[0, 2, :]) ) # Calculate polarization parameters planarity_local = np.squeeze(1 - np.sqrt(w_mat[2, 2, :] / w_mat[0, 0, :])) planarity_local[censure_idx] = np.nan planarity[:, ind_a] = planarity_local # ellipticity: ratio of axes of polarization ellipse axes*sign of # polarization ellipticity_local = np.squeeze(w_mat[1, 1, :] / w_mat[0, 0, :]) * np.sign( np.imag(s_mat_avg[:, 0, 1]) ) ellipticity_local[censure_idx] = np.nan ellipticity[:, ind_a] = ellipticity_local # DOP = sqrt[(3/2.*trace(SM^2)./(trace(SM))^2 - 1/2)]; Samson, 1973, JGR dop = np.sqrt( (3 / 2) * ( np.trace(np.matmul(s_mat_avg, s_mat_avg), axis1=1, axis2=2) / np.trace(s_mat_avg, axis1=1, axis2=2) ** 2 ) - 1 / 2 ) dop[censure_idx] = np.nan dop_3d[:, ind_a] = dop # DOP in 2D = sqrt[2*trace(rA^2)/trace(rA)^2 - 1)]; Ulrich v_mat_new = np.transpose(v_mat, [2, 0, 1]) s_mat_avg2dim = np.matmul( v_mat_new, np.matmul(s_mat_avg, np.transpose(v_mat_new, [0, 2, 1])) ) s_mat_avg2dim = s_mat_avg2dim[:, :2, :2] s_mat_avg = s_mat_avg2dim dop2dim = np.sqrt( 2 * ( np.trace(np.matmul(s_mat_avg, s_mat_avg), axis1=1, axis2=2) / np.trace(s_mat_avg, axis1=1, axis2=2) ** 2 ) - 1 ) dop2dim[censure_idx] = np.nan dop_2d[:, ind_a] = dop # set data gaps to NaN and remove edge effects censure = np.floor(2 * a_) for ind_a in range(len(a_)): censure_idx = np.hstack( [ np.arange(np.min([censure[ind_a], len(in_time)])), np.arange(np.max([1, len(in_time) - censure[ind_a]]), len(in_time)), ] ) censure_idx = censure_idx.astype(np.int64) power_bx_plot[censure_idx, ind_a] = np.nan power_by_plot[censure_idx, ind_a] = np.nan power_bz_plot[censure_idx, ind_a] = np.nan power_2b_plot[censure_idx, ind_a] = np.nan if want_ee: power_ex_plot[censure_idx, ind_a] = np.nan power_ey_plot[censure_idx, ind_a] = np.nan power_ez_plot[censure_idx, ind_a] = np.nan power_2e_plot[censure_idx, ind_a] = np.nan power_2e_isr2_plot[censure_idx, ind_a] = np.nan s_plot_x[censure_idx, ind_a] = np.nan s_plot_y[censure_idx, ind_a] = np.nan s_plot_z[censure_idx, ind_a] = np.nan # remove edge effects from data gaps idx_nan_e = np.sum(idx_nan_e, axis=1) > 0 idx_nan_b = np.sum(idx_nan_b, axis=1) > 0 idx_nan_eisr2 = np.sum(idx_nan_eisr2, axis=1) > 0 n_power_b = len(power_2b_plot) if pc12_range or other_range: censure3 = np.floor(1.8 * a_) else: censure3 = np.floor(0.4 * a_) # Censure magnetic fied power_bx_plot = _censure_plot(power_bx_plot, idx_nan_b, censure3, n_power_b, a_) power_by_plot = _censure_plot(power_by_plot, idx_nan_b, censure3, n_power_b, a_) power_bz_plot = _censure_plot(power_bz_plot, idx_nan_b, censure3, n_power_b, a_) power_2b_plot = _censure_plot(power_2b_plot, idx_nan_b, censure3, n_power_b, a_) # Censure electric field n_power_e = len(power_2e_plot) power_ex_plot = _censure_plot(power_ex_plot, idx_nan_e, censure3, n_power_e, a_) power_ey_plot = _censure_plot(power_ey_plot, idx_nan_e, censure3, n_power_e, a_) power_ez_plot = _censure_plot(power_ez_plot, idx_nan_e, censure3, n_power_e, a_) power_2e_plot = _censure_plot(power_2e_plot, idx_nan_e, censure3, n_power_e, a_) power_2e_isr2_plot = _censure_plot( power_2e_isr2_plot, idx_nan_e, censure3, n_power_e, a_ ) # Censure poynting flux s_plot_x = _censure_plot(s_plot_x, idx_nan_b, censure3, n_power_b, a_) s_plot_x = _censure_plot(s_plot_x, idx_nan_e, censure3, n_power_e, a_) s_plot_y = _censure_plot(s_plot_y, idx_nan_b, censure3, n_power_b, a_) s_plot_y = _censure_plot(s_plot_y, idx_nan_e, censure3, n_power_e, a_) s_plot_z = _censure_plot(s_plot_z, idx_nan_b, censure3, n_power_b, a_) s_plot_z = _censure_plot(s_plot_z, idx_nan_e, censure3, n_power_e, a_) n_power_2e_isr2 = len(power_2e_isr2_plot) power_2e_isr2_plot = _censure_plot( power_2e_isr2_plot, idx_nan_eisr2, censure3, n_power_2e_isr2, a_ ) power_bx_plot = _average_data(power_bx_plot, in_time, out_time) power_by_plot = _average_data(power_by_plot, in_time, out_time) power_bz_plot = _average_data(power_bz_plot, in_time, out_time) power_2b_plot = _average_data(power_2b_plot, in_time, out_time) bb_xxyyzzss = _bb_xxyyzzss( power_bx_plot, power_by_plot, power_bz_plot, power_2b_plot ) # Output res["t"] = unix2datetime64(out_time) res["f"] = frequency_vec[::-1] res["bb_xxyyzzss"] = xr.DataArray( bb_xxyyzzss[:, ::-1, ...], coords=[res["t"], res["f"], ["xx", "yy", "zz", "ss"]], dims=["time", "frequency", "comp"], ) if want_ee: power_ex_plot = _average_data(power_ex_plot, in_time, out_time) power_ey_plot = _average_data(power_ey_plot, in_time, out_time) power_ez_plot = _average_data(power_ez_plot, in_time, out_time) power_2e_plot = _average_data(power_2e_plot, in_time, out_time) power_2e_isr2_plot = _average_data(power_2e_isr2_plot, in_time, out_time) power_2e_isr2_plot = np.real(power_2e_isr2_plot) s_plot_x = np.real(_average_data(s_plot_x, in_time, out_time)) s_plot_y = np.real(_average_data(s_plot_y, in_time, out_time)) s_plot_z = np.real(_average_data(s_plot_z, in_time, out_time)) # TODO: check that it's correct (MATLAB weird stuff) s_azimuth, s_elevation, s_r = cart2sph(s_plot_x, s_plot_y, s_plot_z) ee_xxyyzzss = _ee_xxyyzzss( power_ex_plot, power_ey_plot, power_ez_plot, power_2e_plot ) poynting_xyz = np.tile(s_plot_x, (3, 1, 1)) poynting_xyz = np.transpose(poynting_xyz, [1, 2, 0]) poynting_xyz[:, :, 1] = s_plot_y poynting_xyz[:, :, 2] = s_plot_z poynting_xyz = poynting_xyz.astype(np.float64) poynting_r_th_ph = np.tile(s_r, (3, 1, 1)) poynting_r_th_ph = np.transpose(poynting_r_th_ph, [1, 2, 0]) poynting_r_th_ph[..., 1] = np.pi / 2 - s_elevation poynting_r_th_ph[..., 2] = s_azimuth poynting_r_th_ph[..., 1:] = poynting_r_th_ph[..., 1:] * 180 / np.pi poynting_r_th_ph = poynting_r_th_ph.astype(np.float64) # Output res["ee_ss"] = power_2e_isr2_plot.astype(np.float64) res["ee_xxyyzzss"] = xr.DataArray( ee_xxyyzzss[:, ::-1, ...], coords=[res["t"], res["f"], ["xx", "yy", "zz", "ss"]], dims=["time", "frequency", "comp"], ) res["pf_xyz"] = xr.DataArray( poynting_xyz[:, ::-1, ...], coords=[res["t"], res["f"], ["x", "y", "z"]], dims=["time", "frequency", "comp"], ) res["pf_rtp"] = xr.DataArray( poynting_r_th_ph[:, ::-1, ...], coords=[res["t"], res["f"], ["rho", "theta", "phi"]], dims=["time", "frequency", "comp"], ) if want_polarization: # Define parameters for which we cannot compute the wave vector with warnings.catch_warnings(): warnings.simplefilter(action="ignore", category=RuntimeWarning) ind_low_planarity = planarity < 0.5 ind_low_ellipticity = np.abs(ellipticity) < 0.2 the_svd_fac[ind_low_planarity] = np.nan phi_svd_fac[ind_low_planarity] = np.nan the_svd_fac[ind_low_ellipticity] = np.nan phi_svd_fac[ind_low_ellipticity] = np.nan k_th_ph_svd_fac = np.zeros((the_svd_fac.shape[0], the_svd_fac.shape[1], 2)) k_th_ph_svd_fac[..., 0] = the_svd_fac k_th_ph_svd_fac[..., 1] = phi_svd_fac # Output res["dop"] = xr.DataArray( np.real(dop_3d[:, ::-1]), coords=[res["t"], res["f"]], dims=["time", "frequency"], ) res["dop2d"] = xr.DataArray( np.real(dop_2d[:, ::-1]), coords=[res["t"], res["f"]], dims=["time", "frequency"], ) res["planarity"] = xr.DataArray( planarity[:, ::-1], coords=[res["t"], res["f"]], dims=["time", "frequency"] ) res["ellipticity"] = xr.DataArray( ellipticity[:, ::-1], coords=[res["t"], res["f"]], dims=["time", "frequency"], ) res["k_tp"] = xr.DataArray( k_th_ph_svd_fac[:, ::-1, ...], coords=[res["t"], res["f"], ["theta", "phi"]], dims=["time", "frequency", "comp"], ) return res