Source code for pyrfu.mms.fk_power_spectrum_4sc

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Built-in imports
import bisect
import logging

# 3rd party imports
import numpy as np
import xarray as xr

# Local imports
from ..pyrf.avg_4sc import avg_4sc
from ..pyrf.resample import resample
from ..pyrf.time_clip import time_clip
from ..pyrf.wavelet import wavelet

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"

logging.captureWarnings(True)
logging.basicConfig(
    format="[%(asctime)s] %(levelname)s: %(message)s",
    datefmt="%d-%b-%y %H:%M:%S",
    level=logging.INFO,
)


[docs]def fk_power_spectrum_4sc( e, r, b, tints, cav: int = 8, num_k: int = 500, num_f: int = 200, df: float = None, w_width: int = 1, f_range: list = None, ): r"""Calculates the frequency-wave number power spectrum using the four MMS spacecraft. Uses a generalization of mms.fk_powerspectrum. Wavelet based cross-spectral analysis is used to calculate the phase difference each spacecraft pair and determine 3D wave vector. A generalization of the method used in mms.fk_powerspectrum to four point measurements. Parameters ---------- e : list of xarray.DataArray Fields to apply 4SC cross-spectral analysis to. e.g., e or b fields (if multiple components only the first is used). r : list of xarray.DataArray Positions of the four spacecraft. b : list of xarray.DataArray background magnetic field in the same coordinates as r. Used to determine the parallel and perpendicular wave numbers using 4SC average. tints : list of str Time interval over which the power spectrum is calculated. To avoid boundary effects use a longer time interval for e and b. cav : int, Optional Number of points in time series used to estimate phase. Default ``cav`` = 8. num_k : int, Optional Number of wave numbers used in spectrogram. Default ``num_k`` = 500. df : float, Optional Linear spacing of frequencies. Default ``df`` = None (log spacing). num_f : int, Optional Number of frequencies used in spectrogram. Default ``num_f`` = 200. w_width : float, Optional Multiplier for Morlet wavelet width. Default ``w_width`` = 1. f_range : list of float, Optional Frequency range for k-k plots. [minf maxf]. Returns ------- out : xarray.Dataset Dataset of array of powers as a function of frequency and wavenumber. Power is normalized to the maximum value. Notes ----- Wavelength must be larger than twice the spacecraft separations, otherwise spatial aliasing will occur. Examples -------- >>> from pyrfu.mms import get_data, fk_power_spectrum_4sc >>> from pyrfu.pyrf import extend_tint, convert_fac Load data >>> tint = ["2015-10-16T13:05:24.00", "2015-10-16T13:05:50.000"] >>> ic = range(1, 5) >>> b_fgm = [get_data("b_gse_fgm_brst_l2", tint, i) for i in ic] >>> b_scm = [get_data("b_gse_scm_brst_l2", tint, i) for i in ic] Load spacecraft position >>> tint_long = extend_tint(tint, [-60, 60]) >>> r_gse_mms = [get_data("r_gse", tint_long, i) for i in range(1, 5)] Convert magnetic field fluctuations to field aligned coordinates >>> b_fac = [convert_fac(b_s, b_f) for b_s, b_f in zip(b_scm, b_fgm)] >>> b_par = [b_s[:, 0] for b_s in b_fac] Compute dispersion relation >>> tint = ["2015-10-16T13:05:26.500", "2015-10-16T13:05:27.000"] >>> pwer = fk_power_spectrum_4sc(b_par, r_gse, b_fgm, tint, 4, 500, 2, ... 10, 2) """ ic = np.arange(1, 5) e = [resample(e[i - 1], e[0]) for i in ic] r = [resample(r[i - 1], e[0]) for i in ic] b = [resample(b[i - 1], e[0]) for i in ic] b_avg = avg_4sc(b) times = e[0].time use_linear = df is not None # idx = time_clip(e[0].time, list(tints)) # If odd, remove last data point (as is done in irf_wavelet) # if len(idx) % 2: # idx = idx[:-1] if use_linear: cwt_options = { "linear": df, "return_power": False, "wavelet_width": 5.36 * w_width, "cut_edge": False, } else: cwt_options = { "nf": num_f, "return_power": False, "wavelet_width": 5.36 * w_width, "cut_edge": False, } w = [wavelet(e[i], **cwt_options) for i in range(4)] num_f = len(w[0].frequency) times = time_clip(times, tints) nt = len(times) w = [time_clip(w[i], tints) for i in range(4)] fk_power = 0 for i in range(4): fk_power += (w[i].data * np.conj(w[i].data) / 4).astype(np.float64) n = int(np.floor(nt / cav) - 1) pos_av = cav / 2 + np.arange(n + 1) * cav av_times = times[pos_av.astype(np.int64)] b_avg = resample(b_avg, av_times) r = [resample(r[i], av_times) for i in range(4)] cx12, cx13, cx14 = [np.zeros((n + 1, num_f), dtype="complex128") for _ in range(3)] cx23, cx24, cx34 = [np.zeros((n + 1, num_f), dtype="complex128") for _ in range(3)] power_avg = np.zeros((n + 1, num_f), dtype="complex128") for m, pos_avm in enumerate(pos_av): lb, ub = [int(pos_avm - cav / 2), int(pos_avm + cav / 2)] cx12[m, :] = np.nanmean( w[0].data[lb:ub, :] * np.conj(w[1].data[lb:ub, :]), axis=0, ) cx13[m, :] = np.nanmean( w[0].data[lb:ub, :] * np.conj(w[2].data[lb:ub, :]), axis=0, ) cx14[m, :] = np.nanmean( w[0].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]), axis=0, ) cx23[m, :] = np.nanmean( w[1].data[lb:ub, :] * np.conj(w[2].data[lb:ub, :]), axis=0, ) cx24[m, :] = np.nanmean( w[1].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]), axis=0, ) cx34[m, :] = np.nanmean( w[2].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]), axis=0, ) power_avg[m, :] = np.nanmean(fk_power[lb:ub, :], axis=0) # Compute phase differences between each spacecraft pair th12 = np.arctan2(np.imag(cx12), np.real(cx12)) th13 = np.arctan2(np.imag(cx13), np.real(cx13)) th14 = np.arctan2(np.imag(cx14), np.real(cx14)) th23 = np.arctan2(np.imag(cx23), np.real(cx23)) th24 = np.arctan2(np.imag(cx24), np.real(cx24)) th34 = np.arctan2(np.imag(cx34), np.real(cx34)) w_mat = 2 * np.pi * np.tile(w[0].frequency.data, (n + 1, 1)) # Convert phase difference to time delay dt12, dt13, dt14, dt23, dt24, dt34 = [ th / w_mat for th in [th12, th13, th14, th23, th24, th34] ] # Weighted averaged time delay using all spacecraft pairs dt2 = ( 0.5 * dt12 + 0.2 * (dt13 - dt23) + 0.2 * (dt14 - dt24) + 0.1 * (dt14 - dt34 - dt23) ) dt3 = ( 0.5 * dt13 + 0.2 * (dt12 + dt23) + 0.2 * (dt14 - dt34) + 0.1 * (dt12 + dt24 - dt34) ) dt4 = ( 0.5 * dt14 + 0.2 * (dt12 + dt24) + 0.2 * (dt13 + dt34) + 0.1 * (dt12 + dt23 + dt34) ) # Compute phase speeds r = [r[i].data for i in range(4)] k_x, k_y, k_z = [np.zeros((n + 1, num_f)) for _ in range(3)] for ii in range(n + 1): dr = np.array( [ r[1][ii, :] - r[0][ii, :], r[2][ii, :] - r[0][ii, :], r[3][ii, :] - r[0][ii, :], ], ) for jj in range(num_f): m = np.linalg.solve(dr, [dt2[ii, jj], dt3[ii, jj], dt4[ii, jj]]) k_x[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[0] k_y[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[1] k_z[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[2] k_x, k_y, k_z = [k / 1e3 for k in [k_x, k_y, k_z]] k_mag = np.linalg.norm(np.array([k_x, k_y, k_z]), axis=0) b_avg_x_mat = np.tile(b_avg.data[:, 0], (num_f, 1)).T b_avg_y_mat = np.tile(b_avg.data[:, 1], (num_f, 1)).T b_avg_z_mat = np.tile(b_avg.data[:, 2], (num_f, 1)).T b_avg_abs = np.linalg.norm(b_avg, axis=1) b_avg_abs_mat = np.tile(b_avg_abs, (num_f, 1)).T k_par = (k_x * b_avg_x_mat + k_y * b_avg_y_mat + k_z * b_avg_z_mat) / b_avg_abs_mat k_perp = np.sqrt(k_mag**2 - k_par**2) k_max = np.max(k_mag) * 1.1 k_min = -k_max k_vec = np.linspace(-k_max, k_max, num_k) k_mag_vec = np.linspace(0, k_max, num_k) dk_mag = k_max / num_k dk = 2 * k_max / num_k # Sort power into frequency and wave vector logging.info("Computing power versus kx,f; ky,f, kz,f") power_k_x_f, power_k_y_f, power_k_z_f = [np.zeros((num_f, num_k)) for _ in range(3)] power_k_mag_f = np.zeros((num_f, num_k)) for nn in range(num_f): k_x_number = np.floor((k_x[:, nn] - k_min) / dk).astype(np.int64) k_y_number = np.floor((k_y[:, nn] - k_min) / dk).astype(np.int64) k_z_number = np.floor((k_z[:, nn] - k_min) / dk).astype(np.int64) k_number = np.floor((k_mag[:, nn]) / dk_mag).astype(np.int64) power_k_x_f[nn, k_x_number] += np.real(power_avg[:, nn]) power_k_y_f[nn, k_y_number] += np.real(power_avg[:, nn]) power_k_z_f[nn, k_z_number] += np.real(power_avg[:, nn]) power_k_mag_f[nn, k_number] += np.real(power_avg[:, nn]) power_k_x_f /= np.max(power_k_x_f) power_k_y_f /= np.max(power_k_y_f) power_k_z_f /= np.max(power_k_z_f) power_k_mag_f /= np.max(power_k_mag_f) frequencies = w[0].frequency.data idx_f = np.arange(num_f) if f_range is not None: idx_min_freq = bisect.bisect_left(frequencies, np.min(f_range)) idx_max_freq = bisect.bisect_left(frequencies, np.max(f_range)) idx_f = idx_f[idx_min_freq:idx_max_freq] logging.info("Computing power versus kx,ky; kx,kz; ky,kz") power_k_x_k_y = np.zeros((num_k, num_k)) power_k_x_k_z = np.zeros((num_k, num_k)) power_k_y_k_z = np.zeros((num_k, num_k)) power_k_perp_k_par = np.zeros((num_k, num_k)) for nn in idx_f: k_x_number = np.floor((k_x[:, nn] - k_min) / dk).astype(np.int64) k_y_number = np.floor((k_y[:, nn] - k_min) / dk).astype(np.int64) k_z_number = np.floor((k_z[:, nn] - k_min) / dk).astype(np.int64) k_par_number = np.floor((k_par[:, nn] - k_min) / dk).astype(np.int64) k_perp_number = np.floor((k_perp[:, nn]) / dk_mag).astype(np.int64) power_k_x_k_y[k_y_number, k_x_number] += np.real(power_avg[:, nn]) power_k_x_k_z[k_z_number, k_x_number] += np.real(power_avg[:, nn]) power_k_y_k_z[k_z_number, k_y_number] += np.real(power_avg[:, nn]) power_k_perp_k_par[k_par_number, k_perp_number] += np.real( power_avg[:, nn], ) power_k_x_k_y /= np.max(power_k_x_k_y) power_k_x_k_z /= np.max(power_k_x_k_z) power_k_y_k_z /= np.max(power_k_y_k_z) power_k_perp_k_par /= np.max(power_k_perp_k_par) out_dict = { "k_x_f": (["k_x", "f"], power_k_x_f.T), "k_y_f": (["k_x", "f"], power_k_y_f.T), "k_z_f": (["k_x", "f"], power_k_z_f.T), "k_mag_f": (["k_mag", "f"], power_k_mag_f.T), "k_x_k_y": (["k_x", "k_y"], power_k_x_k_y.T), "k_x_k_z": (["kx", "kz"], power_k_x_k_z.T), "k_y_k_z": (["k_y", "k_z"], power_k_y_k_z.T), "k_perp_k_par": (["k_perp", "k_par"], power_k_perp_k_par.T), "k_x": k_vec, "k_y": k_vec, "k_z": k_vec, "k_mag": k_mag_vec, "k_perp": k_mag_vec, "k_par": k_vec, "f": frequencies, } out = xr.Dataset(out_dict) return out