Source code for pyrfu.mms.fk_power_spectrum_4sc

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Built-in imports
import bisect
import logging

# 3rd party imports
import numpy as np
import xarray as xr
from scipy import linalg

# Local imports
from pyrfu.pyrf.avg_4sc import avg_4sc
from pyrfu.pyrf.resample import resample
from pyrfu.pyrf.time_clip import time_clip
from pyrfu.pyrf.wavelet import wavelet

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"

logging.captureWarnings(True)
logging.basicConfig(
    format="[%(asctime)s] %(levelname)s: %(message)s",
    datefmt="%d-%b-%y %H:%M:%S",
    level=logging.INFO,
)


[docs]def fk_power_spectrum_4sc( e, r, b, tints, cav: int = 8, num_k: int = 500, num_f: int = 200, df: float = None, w_width: int = 1, f_range: list = None, ): r"""Calculates the frequency-wave number power spectrum using the four MMS spacecraft. Uses a generalization of mms.fk_powerspectrum. Wavelet based cross-spectral analysis is used to calculate the phase difference each spacecraft pair and determine 3D wave vector. A generalization of the method used in mms.fk_powerspectrum to four point measurements. Parameters ---------- e : list of xarray.DataArray Fields to apply 4SC cross-spectral analysis to. e.g., e or b fields (if multiple components only the first is used). r : list of xarray.DataArray Positions of the four spacecraft. b : list of xarray.DataArray background magnetic field in the same coordinates as r. Used to determine the parallel and perpendicular wave numbers using 4SC average. tints : list of str Time interval over which the power spectrum is calculated. To avoid boundary effects use a longer time interval for e and b. cav : int, Optional Number of points in time series used to estimate phase. Default ``cav`` = 8. num_k : int, Optional Number of wave numbers used in spectrogram. Default ``num_k`` = 500. df : float, Optional Linear spacing of frequencies. Default ``df`` = None (log spacing). num_f : int, Optional Number of frequencies used in spectrogram. Default ``num_f`` = 200. w_width : float, Optional Multiplier for Morlet wavelet width. Default ``w_width`` = 1. f_range : list of float, Optional Frequency range for k-k plots. [minf maxf]. Returns ------- out : xarray.Dataset Dataset of array of powers as a function of frequency and wavenumber. Power is normalized to the maximum value. Notes ----- Wavelength must be larger than twice the spacecraft separations, otherwise spatial aliasing will occur. Examples -------- >>> from pyrfu.mms import get_data, fk_power_spectrum_4sc >>> from pyrfu.pyrf import extend_tint, convert_fac Load data >>> tint = ["2015-10-16T13:05:24.00", "2015-10-16T13:05:50.000"] >>> ic = range(1, 5) >>> b_fgm = [get_data("b_gse_fgm_brst_l2", tint, i) for i in ic] >>> b_scm = [get_data("b_gse_scm_brst_l2", tint, i) for i in ic] Load spacecraft position >>> tint_long = extend_tint(tint, [-60, 60]) >>> r_gse_mms = [get_data("r_gse", tint_long, i) for i in range(1, 5)] Convert magnetic field fluctuations to field aligned coordinates >>> b_fac = [convert_fac(b_s, b_f) for b_s, b_f in zip(b_scm, b_fgm)] >>> b_para = [b_s[:, 0] for b_s in b_fac] Compute dispersion relation >>> tint = ["2015-10-16T13:05:26.500", "2015-10-16T13:05:27.000"] >>> pwer = fk_power_spectrum_4sc(b_para, r_gse, b_fgm, tint, 4, 500, 2, ... 10, 2) """ ic = np.arange(1, 5) # Resample magnetic field to common time base # and compute 4SC average b = [resample(b[i - 1], b[0]) for i in ic] b_avg = avg_4sc(b) # Resample e and r to common time base e = [resample(e[i - 1], e[0]) for i in ic] r = [resample(r[i - 1], e[0]) for i in ic] times = e[0].time use_linear = df is not None if use_linear: cwt_options = { "linear": df, "return_power": False, "wavelet_width": 5.36 * w_width, "cut_edge": False, } else: cwt_options = { "n_freqs": num_f, "return_power": False, "wavelet_width": 5.36 * w_width, "cut_edge": False, } w = [wavelet(e[i], **cwt_options) for i in range(4)] num_f = len(w[0].frequency) times = time_clip(times, tints) nt = len(times) # Ensure even number of time points if nt % 2: times = times[:-1] nt -= 1 # Clip wavelet transforms to time interval w = [w[i].sel(time=times) for i in range(4)] # Compute averaged power spectrum from all spacecraft fk_power = np.zeros((nt, num_f), dtype=np.float64) for i in range(4): fk_power += (w[i].data * np.conj(w[i].data) / 4).astype(np.float64) # Find the time positions for averaging cav = int(cav) n = int(np.floor(nt / cav) - 1) pos_av = cav // 2 + np.arange(n + 1) * cav - 1 av_times = times[pos_av] # Resample background magnetic field and spacecraft positions to # averaged time positions b_avg = resample(b_avg, av_times) r = [resample(r[i], av_times) for i in range(4)] cx12, cx13, cx14 = [np.zeros((n + 1, num_f), dtype="complex128") for _ in range(3)] cx23, cx24, cx34 = [np.zeros((n + 1, num_f), dtype="complex128") for _ in range(3)] power_avg = np.zeros((n + 1, num_f), dtype="float64") for m, pos_avm in enumerate(pos_av): lb, ub = [pos_avm - cav // 2 + 1, pos_avm + cav // 2 + 1] cx12[m, :] = np.nanmean( w[0].data[lb:ub, :] * np.conj(w[1].data[lb:ub, :]), axis=0, ) cx13[m, :] = np.nanmean( w[0].data[lb:ub, :] * np.conj(w[2].data[lb:ub, :]), axis=0, ) cx14[m, :] = np.nanmean( w[0].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]), axis=0, ) cx23[m, :] = np.nanmean( w[1].data[lb:ub, :] * np.conj(w[2].data[lb:ub, :]), axis=0, ) cx24[m, :] = np.nanmean( w[1].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]), axis=0, ) cx34[m, :] = np.nanmean( w[2].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]), axis=0, ) power_avg[m, :] = np.nanmean(fk_power[lb:ub, :], axis=0) # Compute phase differences between each spacecraft pair th12 = np.arctan2(np.imag(cx12), np.real(cx12)) th13 = np.arctan2(np.imag(cx13), np.real(cx13)) th14 = np.arctan2(np.imag(cx14), np.real(cx14)) th23 = np.arctan2(np.imag(cx23), np.real(cx23)) th24 = np.arctan2(np.imag(cx24), np.real(cx24)) th34 = np.arctan2(np.imag(cx34), np.real(cx34)) w_mat = 2 * np.pi * np.tile(w[0].frequency.data, (n + 1, 1)) # Convert phase difference to time delay dt12 = th12 / w_mat dt13 = th13 / w_mat dt14 = th14 / w_mat dt23 = th23 / w_mat dt24 = th24 / w_mat dt34 = th34 / w_mat # Weighted averaged time delay using all spacecraft pairs dt2 = ( 0.5 * dt12 + 0.2 * (dt13 - dt23) + 0.2 * (dt14 - dt24) + 0.1 * (dt14 - dt34 - dt23) ) dt3 = ( 0.5 * dt13 + 0.2 * (dt12 + dt23) + 0.2 * (dt14 - dt34) + 0.1 * (dt12 + dt24 - dt34) ) dt4 = ( 0.5 * dt14 + 0.2 * (dt12 + dt24) + 0.2 * (dt13 + dt34) + 0.1 * (dt12 + dt23 + dt34) ) # Compute phase speeds r = [r[i].data for i in range(4)] k_x, k_y, k_z = [np.zeros((n + 1, num_f)) for _ in range(3)] for ii in range(n + 1): dr = np.array( [ r[1][ii, :] - r[0][ii, :], r[2][ii, :] - r[0][ii, :], r[3][ii, :] - r[0][ii, :], ], ) for jj in range(num_f): m = linalg.solve(dr, np.array([dt2[ii, jj], dt3[ii, jj], dt4[ii, jj]])) k_x[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[0] k_y[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[1] k_z[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[2] k_x, k_y, k_z = [k / 1e3 for k in [k_x, k_y, k_z]] k_mag = np.linalg.norm(np.array([k_x, k_y, k_z]), axis=0) # Compute the parallel and perpendicular wave numbers using the background magnetic # field averaged over the four spacecraft. This is used to b_avg_x_mat = np.tile(b_avg.data[:, 0], (num_f, 1)).T b_avg_y_mat = np.tile(b_avg.data[:, 1], (num_f, 1)).T b_avg_z_mat = np.tile(b_avg.data[:, 2], (num_f, 1)).T b_avg_abs = np.linalg.norm(b_avg, axis=1) b_avg_abs_mat = np.tile(b_avg_abs, (num_f, 1)).T k_para = (k_x * b_avg_x_mat + k_y * b_avg_y_mat + k_z * b_avg_z_mat) / b_avg_abs_mat k_perp = np.sqrt(k_mag**2 - k_para**2) # Determine the maximum and minimum wave numbers k_max = np.max(k_mag) * 1.1 k_min = -k_max k_vec = np.linspace(-k_max, k_max, num_k) k_mag_vec = np.linspace(0, k_max, num_k) dk_mag = k_max / num_k dk = 2 * k_max / num_k # Sort power into frequency and wave vector logging.info("Computing power versus (kx,f); (ky,f), (kz,f), (k,f)") power_k_x_f, power_k_y_f, power_k_z_f = [np.zeros((num_f, num_k)) for _ in range(3)] power_k_mag_f = np.zeros((num_f, num_k)) for mm in range(n + 1): for nn in range(num_f): k_x_number = int(np.floor((k_x[mm, nn] - k_min) / dk)) k_y_number = int(np.floor((k_y[mm, nn] - k_min) / dk)) k_z_number = int(np.floor((k_z[mm, nn] - k_min) / dk)) k_number = int(np.floor(k_mag[mm, nn] / dk_mag)) power_k_x_f[nn, k_x_number] += power_avg[mm, nn] power_k_y_f[nn, k_y_number] += power_avg[mm, nn] power_k_z_f[nn, k_z_number] += power_avg[mm, nn] power_k_mag_f[nn, k_number] += power_avg[mm, nn] # Normalize power to maximum value for plotting power_k_x_f /= np.max(power_k_x_f) power_k_y_f /= np.max(power_k_y_f) power_k_z_f /= np.max(power_k_z_f) power_k_mag_f /= np.max(power_k_mag_f) frequencies = w[0].frequency.data idx_f = np.arange(num_f) if f_range is not None: idx_min_freq = bisect.bisect_left(frequencies, np.min(f_range)) idx_max_freq = bisect.bisect_left(frequencies, np.max(f_range)) idx_f = idx_f[idx_min_freq:idx_max_freq] # Sort power into wave vector space for k_x, k_y; k_x, k_z; k_y, k_z logging.info("Computing power versus (kx,ky); (kx,kz); (ky,kz)") power_k_x_k_y = np.zeros((num_k, num_k)) power_k_x_k_z = np.zeros((num_k, num_k)) power_k_y_k_z = np.zeros((num_k, num_k)) for mm in range(n + 1): for nn in idx_f: # Find the position of the power in the # k_x, k_y; k_x, k_z; k_y, k_z space k_x_number = int(np.floor((k_x[mm, nn] - k_min) / dk)) k_y_number = int(np.floor((k_y[mm, nn] - k_min) / dk)) k_z_number = int(np.floor((k_z[mm, nn] - k_min) / dk)) # Add the power to the corresponding position in the # k_x, k_y; k_x, k_z; k_y, k_z space power_k_x_k_y[k_y_number, k_x_number] += power_avg[mm, nn] power_k_x_k_z[k_z_number, k_x_number] += power_avg[mm, nn] power_k_y_k_z[k_z_number, k_y_number] += power_avg[mm, nn] # Normalize power to maximum value for plotting power_k_x_k_y /= np.max(power_k_x_k_y) power_k_x_k_z /= np.max(power_k_x_k_z) power_k_y_k_z /= np.max(power_k_y_k_z) # Sort power into wave vector space for k_perp, k_para logging.info("Computing power versus kperp,kpara") power_k_perp_k_para = np.zeros((num_k, num_k)) for mm in range(n + 1): for nn in idx_f: # Find the position of the power in the k_para, k_perp space k_para_number = int(np.floor((k_para[mm, nn] - k_min) / dk)) k_perp_number = int(np.floor((k_perp[mm, nn]) / dk_mag)) # Add the power to the corresponding position in the k_para, k_perp space power_k_perp_k_para[k_para_number, k_perp_number] += power_avg[mm, nn] # Normalize power to maximum value for plotting power_k_perp_k_para /= np.max(power_k_perp_k_para) # Set zero power to NaN for plotting purposes power_k_x_f[power_k_x_f == 0] = np.nan power_k_y_f[power_k_y_f == 0] = np.nan power_k_z_f[power_k_z_f == 0] = np.nan power_k_mag_f[power_k_mag_f == 0] = np.nan power_k_x_k_y[power_k_x_k_y == 0] = np.nan power_k_x_k_z[power_k_x_k_z == 0] = np.nan power_k_y_k_z[power_k_y_k_z == 0] = np.nan power_k_perp_k_para[power_k_perp_k_para == 0] = np.nan # Find the wave numbers and frequencies weighted by power # kmag, f k_powers = np.nansum(power_k_mag_f[idx_f, :], axis=0) f_powers = np.nansum(power_k_mag_f[idx_f, :], axis=1) f_avg = np.nansum(frequencies[idx_f] * f_powers) / np.nansum(f_powers) f_std = np.sqrt( np.nansum((frequencies[idx_f] - f_avg) ** 2 * f_powers) / np.nansum(f_powers) ) k_avg = np.nansum(k_mag_vec * k_powers) / np.nansum(k_powers) k_std = np.sqrt( np.nansum((k_mag_vec - k_avg) ** 2 * k_powers) / np.nansum(k_powers) ) # kpara, kperp kpara_powers = np.nansum(power_k_perp_k_para, axis=1) kperp_powers = np.nansum(power_k_perp_k_para, axis=0) kpar_avg = np.nansum(k_vec * kpara_powers) / np.nansum(kpara_powers) kpar_std = np.sqrt( np.nansum((k_vec - kpar_avg) ** 2 * kpara_powers) / np.nansum(kpara_powers) ) kperp_avg = np.nansum(k_mag_vec * kperp_powers) / np.nansum(kperp_powers) kperp_std = np.sqrt( np.nansum((k_mag_vec - kperp_avg) ** 2 * kperp_powers) / np.nansum(kperp_powers) ) # kx, ky, kz kx_powers = np.nansum(power_k_x_k_y, axis=0) ky_powers = np.nansum(power_k_x_k_y, axis=1) kz_powers = np.nansum(power_k_x_k_z, axis=1) kx_avg = np.nansum(k_vec * kx_powers) / np.nansum(kx_powers) kx_std = np.sqrt( np.nansum((k_vec - kx_avg) ** 2 * kx_powers) / np.nansum(kx_powers) ) ky_avg = np.nansum(k_vec * ky_powers) / np.nansum(ky_powers) ky_std = np.sqrt( np.nansum((k_vec - ky_avg) ** 2 * ky_powers) / np.nansum(ky_powers) ) kz_avg = np.nansum(k_vec * kz_powers) / np.nansum(kz_powers) kz_std = np.sqrt( np.nansum((k_vec - kz_avg) ** 2 * kz_powers) / np.nansum(kz_powers) ) attrs = { "f_avg": f_avg, "f_std": f_std, "k_mag_avg": k_avg, "k_mag_std": k_std, "k_para_avg": kpar_avg, "k_para_std": kpar_std, "k_perp_avg": kperp_avg, "k_perp_std": kperp_std, "k_x_avg": kx_avg, "k_x_std": kx_std, "k_y_avg": ky_avg, "k_y_std": ky_std, "k_z_avg": kz_avg, "k_z_std": kz_std, } out_dict = { "k_x_f": (["k_x", "f"], power_k_x_f.T), "k_y_f": (["k_x", "f"], power_k_y_f.T), "k_z_f": (["k_x", "f"], power_k_z_f.T), "k_mag_f": (["k_mag", "f"], power_k_mag_f.T), "k_x_k_y": (["k_x", "k_y"], power_k_x_k_y.T), "k_x_k_z": (["k_x", "k_z"], power_k_x_k_z.T), "k_y_k_z": (["k_y", "k_z"], power_k_y_k_z.T), "k_perp_k_para": (["k_perp", "k_para"], power_k_perp_k_para.T), "k_x": k_vec, "k_y": k_vec, "k_z": k_vec, "k_mag": k_mag_vec, "k_perp": k_mag_vec, "k_para": k_vec, "f": frequencies, } out = xr.Dataset(out_dict, attrs=attrs) return out