Source code for pyrfu.pyrf.wavepolarize_means

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Built-in imports
import logging

# 3rd party imports
import numpy as np
import xarray as xr

from .datetime642unix import datetime642unix

# Local imports
from .resample import resample

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"

logging.captureWarnings(True)
logging.basicConfig(
    format="[%(asctime)s] %(levelname)s: %(message)s",
    datefmt="%d-%b-%y %H:%M:%S",
    level=logging.INFO,
)


def _spec_mat(half_spec_x, half_spec_y, half_spec_z):
    r"""CALCULATION OF THE SPECTRAL MATRIX"""

    spec_mat = np.zeros([len(half_spec_x), 3, 3])
    spec_mat[:, 0, 0] = half_spec_x * np.conj(half_spec_x)
    spec_mat[:, 1, 0] = half_spec_x * np.conj(half_spec_y)
    spec_mat[:, 2, 0] = half_spec_x * np.conj(half_spec_z)
    spec_mat[:, 0, 1] = half_spec_y * np.conj(half_spec_x)
    spec_mat[:, 1, 1] = half_spec_y * np.conj(half_spec_y)
    spec_mat[:, 2, 1] = half_spec_y * np.conj(half_spec_z)
    spec_mat[:, 0, 2] = half_spec_z * np.conj(half_spec_x)
    spec_mat[:, 1, 2] = half_spec_z * np.conj(half_spec_y)
    spec_mat[:, 2, 2] = half_spec_z * np.conj(half_spec_z)

    return spec_mat


[docs]def wavepolarize_means( b_wave, b_bgd, min_psd: float = 1e-25, nop_fft: int = 256, ): r"""Analysis the polarization of magnetic wave using "means" method Parameters ----------- b_wave : xarray.DataArray Time series of the magnetic field from Search Coil Magnetometer (SCM). b_bgd : xarray.DataArray Time series of the magnetic field from Flux Gate Magnetometer (FGM). min_psd : float, Optional Threshold for the analysis (e.g 1.0e-7). Below this value, the SVD analysis is meaningless if min_psd is not given, SVD analysis will be done for all waves. Default ``min_psd`` = 1e-25. nop_fft : int, Optional Number of points in FFT. Default is 256. Returns ------- b_psd : xarray.DataArray Power spectrum density of magnetic filed wave. wave_angle : xarray.DataArray (form 0 to 90) deg_pol : xarray.DataArray Spectrogram of the degree of polarization (form 0 to 1). elliptict : xarray.DataArray Spectrogram of the ellipticity (form -1 to 1) helict : xarray.DataArray Spectrogram of the helicity (form -1 to 1) Notes ------ ``b_wave`` and ``b_bgd`` should be from the same satellite and in the same coordinates .. warning:: If one component is an order of magnitude or more greater than the other two then the polarization results saturate and erroneously indicate high degrees of polarization at all times and frequencies. Time series should be eyeballed before running the program. For time series containing very rapid changes or spikes the usual problems with Fourier analysis arise. Care should be taken in evaluating degree of polarization results. For meaningful results there should be significant wave power at the frequency where the polarization approaches 100%. Remember comparing two straight lines yields 100% polarization. Examples -------- >>> from pyrfu import pyrf >>> polarization = pyrf.wavepolarize_means(b_wave, b_bgd) >>> polarization = pyrf.wavepolarize_means(b_wave, b_bgd, 1.0e-7) >>> polarization = pyrf.wavepolarize_means(b_wave, b_bgd, 1.0e-7, 256) """ step_length = int(nop_fft / 2) n_pts = len(b_wave) # total number of FFTs n_stp = int((n_pts - nop_fft) / step_length) # No. of bins in frequency domain n_bin = 7 # Smoothing profile based on Hanning aa = np.array([0.024, 0.093, 0.232, 0.301, 0.232, 0.093, 0.024]) # change wave to MFA coordinates b_bgd = resample(b_bgd, b_wave) b_x, b_y, b_z = [np.zeros(len(b_wave)) for _ in range(3)] for ii in range(len(b_wave)): nb = b_bgd[ii, :] / np.linalg.norm(b_bgd[ii, :]) n_perp1 = np.cross(nb, [0, 1, 0]) n_perp1 = n_perp1 / np.linalg.norm(n_perp1) n_perp2 = np.cross(nb, n_perp1) b_z[ii] = np.sum(b_wave[ii, :] * nb) b_x[ii] = np.sum(b_wave[ii, :] * n_perp1) b_y[ii] = np.sum(b_wave[ii, :] * n_perp2) ct = datetime642unix(b_wave.time.data) # DEFINE ARRAYS xs, ys, zs = [b_x, b_y, b_z] sample_freq = 1 / (ct[1] - ct[0]) end_sample_freq = 1 / (ct[-1] - ct[-2]) if sample_freq != end_sample_freq: logging.info( "file sampling frequency changes %(sample_freq)3.2f Hz " "to %(end_sample_freq)3.2f Hz", {"sample_freq": sample_freq, "end_sample_freq": end_sample_freq}, ) else: logging.info("ac file sampling frequency %3.2f Hz", sample_freq) # FFT calculation # Minimum variance direction and wave normal angle wave_angle = np.zeros([n_stp, int(nop_fft / 2)]) # Degree of Polarization sqrd_mat = np.zeros([n_stp, int(nop_fft / 2), 3, 3]) deg_pol = np.zeros([n_stp, int(nop_fft / 2)]) trace_spec_mat = np.zeros([n_stp, int(nop_fft / 2)]) # HELICITY, ELLIPTICITY AND THE WAVE STATE VECTOR lambda_u = np.zeros([n_stp, int(nop_fft / 2), 3, 3]) helic, ellip = [np.zeros([n_stp, int(nop_fft / 2), 3]) for _ in range(2)] smooth = np.zeros(nop_fft) for j in range(n_stp): # FFT CALCULATION smooth = 0.08 + 0.46 * ( 1 - np.cos(2 * np.pi * np.arange(1, nop_fft + 1) / nop_fft) ) temp_x = ( smooth * xs[((j - 1) * step_length + 1) : ((j - 1) * step_length + nop_fft)] ) temp_y = ( smooth * ys[((j - 1) * step_length + 1) : ((j - 1) * step_length + nop_fft)] ) temp_z = ( smooth * zs[((j - 1) * step_length + 1) : ((j - 1) * step_length + nop_fft)] ) spec_x = np.fft.fft(temp_x) spec_y = np.fft.fft(temp_y) spec_z = np.fft.fft(temp_z) half_spec_x = spec_x[: (nop_fft / 2)] half_spec_y = spec_y[: (nop_fft / 2)] half_spec_z = spec_z[: (nop_fft / 2)] xs = np.roll(xs, -step_length) ys = np.roll(ys, -step_length) zs = np.roll(zs, -step_length) # CALCULATION OF THE SPECTRAL MATRIX spec_mat = _spec_mat(half_spec_x, half_spec_y, half_spec_z) # Calculation of smoothed spectral matrix e_spec_mat = np.nan * np.ones(spec_mat.shape) off_idx = int((n_bin - 1) / 2) for k in range(off_idx, int(nop_fft / 2) - off_idx): for ir in range(3): for ic in range(3): e_spec_mat[k, ir, ic] = np.sum( aa[:n_bin] * spec_mat[(k - off_idx) : (k + off_idx), ir, ic], ) # Calculation of the minimum variance direction and wave normal angle aaa2 = np.imag(e_spec_mat[:, 0, 1]) ** 2 aaa2 += np.imag(e_spec_mat[:, 0, 2]) ** 2 aaa2 += np.imag(e_spec_mat[:, 1, 2]) ** 2 aaa2 = np.sqrt(aaa2[j, :]) wn_x = -np.abs(np.imag(e_spec_mat[:, 1, 2]) / aaa2) wn_y = -np.abs(np.imag(e_spec_mat[:, 0, 2]) / aaa2) wn_z = np.imag(e_spec_mat[:, 0, 1]) / aaa2 wave_angle[j, :] = np.arctan( np.sqrt(wn_x**2 + wn_y**2) / np.abs(wn_z), ) # CALCULATION OF THE DEGREE OF POLARISATION # calc of square of smoothed spec matrix for ir in range(3): for ic in range(3): sqrd_mat[:, ir, ic] = e_spec_mat[:, ir, 0] * e_spec_mat[:, 0, ic] sqrd_mat[:, ir, ic] += e_spec_mat[:, ir, 1] * e_spec_mat[:, 1, ic] sqrd_mat[:, ir, ic] += e_spec_mat[:, ir, 2] * e_spec_mat[:, 2, ic] trace_sqrd_mat = sqrd_mat[:, 0, 0] + sqrd_mat[:, 1, 1] + sqrd_mat[:, 2, 2] trace_spec_mat[j, :] = ( e_spec_mat[:, 0, 0] + e_spec_mat[:, 1, 1] + e_spec_mat[:, 2, 2] ) deg_pol[j, :] = trace_spec_mat[j, :] * np.nan deg_pol[j, off_idx : int(nop_fft / 2) - off_idx] = ( 3 * trace_sqrd_mat[off_idx : int(nop_fft / 2) - off_idx] ) deg_pol[j, off_idx : int(nop_fft / 2) - off_idx] -= ( trace_spec_mat[j, off_idx : int(nop_fft / 2) - off_idx] ** 2 ) deg_pol[j, off_idx : int(nop_fft / 2) - off_idx] /= ( 2 * trace_spec_mat[j, off_idx : int(nop_fft / 2) - off_idx] ** 2 ) # Calculation of helicity, ellipticity and the wave state vector alpha_x = np.sqrt(e_spec_mat[:, 0, 0]) alpha_y = np.sqrt(e_spec_mat[:, 1, 1]) alpha_z = np.sqrt(e_spec_mat[:, 2, 2]) alpha_cos1_x = np.real(e_spec_mat[:, 0, 1]) / np.sqrt( e_spec_mat[:, 0, 0], ) alpha_sin1_x = -np.imag(e_spec_mat[j, :, 0, 1]) / np.sqrt( e_spec_mat[:, 0, 0], ) alpha_cos2_x = np.real(e_spec_mat[:, 0, 2]) / np.sqrt( e_spec_mat[:, 0, 0], ) alpha_sin2_x = -np.imag(e_spec_mat[j, :, 0, 2]) / np.sqrt( e_spec_mat[:, 0, 0], ) alpha_cos1_y = np.real(e_spec_mat[:, 1, 0]) / np.sqrt( e_spec_mat[:, 1, 1], ) alpha_sin1_y = -np.imag(e_spec_mat[:, 1, 0]) / np.sqrt( e_spec_mat[:, 1, 1], ) alpha_cos2_y = np.real(e_spec_mat[:, 1, 2]) / np.sqrt( e_spec_mat[:, 1, 1], ) alpha_sin2_y = -np.imag(e_spec_mat[:, 1, 2]) / np.sqrt( e_spec_mat[:, 1, 1], ) alpha_cos1_z = np.real(e_spec_mat[:, 2, 0]) / np.sqrt( e_spec_mat[:, 2, 2], ) alpha_sin1_z = -np.imag(e_spec_mat[:, 2, 0]) / np.sqrt( e_spec_mat[:, 2, 2], ) alpha_cos2_z = np.real(e_spec_mat[:, 2, 1]) / np.sqrt( e_spec_mat[:, 2, 2], ) alpha_sin2_z = -np.imag(e_spec_mat[:, 2, 1]) / np.sqrt( e_spec_mat[:, 2, 2], ) lambda_u[:, 0, 0] = alpha_x lambda_u[:, 1, 0] = alpha_y lambda_u[:, 2, 0] = alpha_z lambda_u[:, 0, 1] = alpha_cos1_x + alpha_sin1_x * 1j lambda_u[:, 0, 2] = alpha_cos2_x + alpha_sin2_x * 1j lambda_u[:, 1, 1] = alpha_cos1_y + alpha_sin1_y * 1j lambda_u[:, 1, 2] = alpha_cos2_y + alpha_sin2_y * 1j lambda_u[:, 2, 1] = alpha_cos1_z + alpha_sin1_z * 1j lambda_u[:, 2, 2] = alpha_cos2_z + alpha_sin2_z * 1j for k in range(int(nop_fft / 2)): for xyz in range(3): # HELICITY CALCULATION upper = np.sum( 2 * np.real(lambda_u[k, xyz, :3]) * (np.imag(lambda_u[k, xyz, :3])), ) lower = np.sum( (np.real(lambda_u[k, xyz, :3])) ** 2 - (np.imag(lambda_u[k, xyz, :3])) ** 2, ) if upper[j, k] > 0: gamma = np.arctan(upper[j, k] / lower[j, k]) else: gamma = np.pi + (np.pi + np.arctan(upper[j, k] / lower[j, k])) lambda_u[k, xyz, :] = np.exp(-0.5 * gamma * 1j) * lambda_u[k, xyz, :] helic[j, k, xyz] = np.sqrt( np.sum(np.real(lambda_u[k, xyz, :3]) ** 2), ) helic[j, k, xyz] /= np.sqrt( np.sum(np.imag(lambda_u[k, xyz, :3]) ** 2), ) helic[j, k, xyz] = np.divide(1, helic[j, k, xyz]) # ELLIPTICITY CALCULATION upper_e = np.sum( np.imag(lambda_u[k, xyz, :3]) * np.real(lambda_u[k, xyz, :3]), ) lower_e = np.sum(np.real(lambda_u[k, xyz, :2]) ** 2) - np.sum( np.imag(lambda_u[k, xyz, :2]) ** 2, ) if upper_e > 0: gamma_rot = np.arctan(upper_e / lower_e) else: gamma_rot = np.pi + np.pi + np.arctan(upper_e / lower_e) lam = lambda_u[k, xyz, :2] lambda_u_rot = np.exp(-0.5 * gamma_rot * 1j) * lam ellip[j, k, xyz] = np.sqrt(np.sum(np.imag(lambda_u_rot) ** 2)) ellip[j, k, xyz] /= np.sqrt( np.sum(np.real(lambda_u_rot) ** 2), ) ellip[j, k, xyz] *= -( np.imag(e_spec_mat[k, 0, 1]) * np.sin(wave_angle[j, k]) ) ellip[j, k, xyz] /= np.abs( np.imag(e_spec_mat[k, 0, 1]) * np.sin(wave_angle[j, k]), ) # AVERAGING HELICITY AND ELLIPTICITY RESULTS ellipticity = np.mean(ellip, axis=-1) helicity = np.mean(helic, axis=-1) # CREATING OUTPUT PARAMETER time_line = ( ct[0] + np.abs(nop_fft / 2) / sample_freq + np.arange(1, n_stp + 1) * step_length / sample_freq ) bin_width = sample_freq / nop_fft freq_line = bin_width * np.arange(1, nop_fft / 2 + 1) # scaling power results to units with meaning W = nop_fft * np.sum(smooth**2) power_spec = np.zeros([n_stp, int(nop_fft / 2)]) power_spec[:, 1 : nop_fft / 2 - 1] = ( 1 / W * 2 * trace_spec_mat[:, 1 : nop_fft / 2 - 1] / bin_width ) power_spec[:, 1] = 1 / W * trace_spec_mat[:, 0] / bin_width power_spec[:, nop_fft / 2] = 1 / W * trace_spec_mat[:, nop_fft / 2] / bin_width # KICK OUT THE ANALYSIS OF THE WEAK SIGNALS wave_angle[power_spec < min_psd] = np.nan deg_pol[power_spec < min_psd] = np.nan ellipticity[power_spec < min_psd] = np.nan helicity[power_spec < min_psd] = np.nan # Save as DataArrays b_psd = xr.DataArray( power_spec, coords=[time_line, freq_line], dims=["t", "f"], ) wave_angle = xr.DataArray( wave_angle * 180 / np.pi, coords=[time_line, freq_line], dims=["t", "f"], ) ellipticity = xr.DataArray( ellipticity, coords=[time_line, freq_line], dims=["t", "f"], ) deg_pol = xr.DataArray( deg_pol, coords=[time_line, freq_line], dims=["t", "f"], ) helicity = xr.DataArray( helicity, coords=[time_line, freq_line], dims=["t", "f"], ) b_psd.f.attrs["units"] = "Hz" wave_angle.f.attrs["units"] = "Hz" deg_pol.f.attrs["units"] = "Hz" ellipticity.f.attrs["units"] = "Hz" helicity.f.attrs["units"] = "Hz" return b_psd, wave_angle, deg_pol, ellipticity, helicity