#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Built-in imports
import bisect
import logging
# 3rd party imports
import numpy as np
import xarray as xr
from scipy import linalg
# Local imports
from pyrfu.pyrf.avg_4sc import avg_4sc
from pyrfu.pyrf.resample import resample
from pyrfu.pyrf.time_clip import time_clip
from pyrfu.pyrf.wavelet import wavelet
__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"
logging.captureWarnings(True)
logging.basicConfig(
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
level=logging.INFO,
)
[docs]def fk_power_spectrum_4sc(
e,
r,
b,
tints,
cav: int = 8,
num_k: int = 500,
num_f: int = 200,
df: float = None,
w_width: int = 1,
f_range: list = None,
):
r"""Calculates the frequency-wave number power spectrum using the four
MMS spacecraft. Uses a generalization of mms.fk_powerspectrum. Wavelet
based cross-spectral analysis is used to calculate the phase difference
each spacecraft pair and determine 3D wave vector. A generalization of
the method used in mms.fk_powerspectrum to four point measurements.
Parameters
----------
e : list of xarray.DataArray
Fields to apply 4SC cross-spectral analysis to. e.g., e or b fields
(if multiple components only the first is used).
r : list of xarray.DataArray
Positions of the four spacecraft.
b : list of xarray.DataArray
background magnetic field in the same coordinates as r. Used to
determine the parallel and perpendicular wave numbers using 4SC
average.
tints : list of str
Time interval over which the power spectrum is calculated. To avoid
boundary effects use a longer time interval for e and b.
cav : int, Optional
Number of points in time series used to estimate phase.
Default ``cav`` = 8.
num_k : int, Optional
Number of wave numbers used in spectrogram. Default ``num_k`` = 500.
df : float, Optional
Linear spacing of frequencies. Default ``df`` = None (log spacing).
num_f : int, Optional
Number of frequencies used in spectrogram. Default ``num_f`` = 200.
w_width : float, Optional
Multiplier for Morlet wavelet width. Default ``w_width`` = 1.
f_range : list of float, Optional
Frequency range for k-k plots. [minf maxf].
Returns
-------
out : xarray.Dataset
Dataset of array of powers as a function of frequency and
wavenumber. Power is normalized to the maximum value.
Notes
-----
Wavelength must be larger than twice the spacecraft separations,
otherwise spatial aliasing will occur.
Examples
--------
>>> from pyrfu.mms import get_data, fk_power_spectrum_4sc
>>> from pyrfu.pyrf import extend_tint, convert_fac
Load data
>>> tint = ["2015-10-16T13:05:24.00", "2015-10-16T13:05:50.000"]
>>> ic = range(1, 5)
>>> b_fgm = [get_data("b_gse_fgm_brst_l2", tint, i) for i in ic]
>>> b_scm = [get_data("b_gse_scm_brst_l2", tint, i) for i in ic]
Load spacecraft position
>>> tint_long = extend_tint(tint, [-60, 60])
>>> r_gse_mms = [get_data("r_gse", tint_long, i) for i in range(1, 5)]
Convert magnetic field fluctuations to field aligned coordinates
>>> b_fac = [convert_fac(b_s, b_f) for b_s, b_f in zip(b_scm, b_fgm)]
>>> b_para = [b_s[:, 0] for b_s in b_fac]
Compute dispersion relation
>>> tint = ["2015-10-16T13:05:26.500", "2015-10-16T13:05:27.000"]
>>> pwer = fk_power_spectrum_4sc(b_para, r_gse, b_fgm, tint, 4, 500, 2,
... 10, 2)
"""
ic = np.arange(1, 5)
# Resample magnetic field to common time base
# and compute 4SC average
b = [resample(b[i - 1], b[0]) for i in ic]
b_avg = avg_4sc(b)
# Resample e and r to common time base
e = [resample(e[i - 1], e[0]) for i in ic]
r = [resample(r[i - 1], e[0]) for i in ic]
times = e[0].time
use_linear = df is not None
if use_linear:
cwt_options = {
"linear": df,
"return_power": False,
"wavelet_width": 5.36 * w_width,
"cut_edge": False,
}
else:
cwt_options = {
"n_freqs": num_f,
"return_power": False,
"wavelet_width": 5.36 * w_width,
"cut_edge": False,
}
w = [wavelet(e[i], **cwt_options) for i in range(4)]
num_f = len(w[0].frequency)
times = time_clip(times, tints)
nt = len(times)
# Ensure even number of time points
if nt % 2:
times = times[:-1]
nt -= 1
# Clip wavelet transforms to time interval
w = [w[i].sel(time=times) for i in range(4)]
# Compute averaged power spectrum from all spacecraft
fk_power = np.zeros((nt, num_f), dtype=np.float64)
for i in range(4):
fk_power += (w[i].data * np.conj(w[i].data) / 4).astype(np.float64)
# Find the time positions for averaging
cav = int(cav)
n = int(np.floor(nt / cav) - 1)
pos_av = cav // 2 + np.arange(n + 1) * cav - 1
av_times = times[pos_av]
# Resample background magnetic field and spacecraft positions to
# averaged time positions
b_avg = resample(b_avg, av_times)
r = [resample(r[i], av_times) for i in range(4)]
cx12, cx13, cx14 = [np.zeros((n + 1, num_f), dtype="complex128") for _ in range(3)]
cx23, cx24, cx34 = [np.zeros((n + 1, num_f), dtype="complex128") for _ in range(3)]
power_avg = np.zeros((n + 1, num_f), dtype="float64")
for m, pos_avm in enumerate(pos_av):
lb, ub = [pos_avm - cav // 2 + 1, pos_avm + cav // 2 + 1]
cx12[m, :] = np.nanmean(
w[0].data[lb:ub, :] * np.conj(w[1].data[lb:ub, :]),
axis=0,
)
cx13[m, :] = np.nanmean(
w[0].data[lb:ub, :] * np.conj(w[2].data[lb:ub, :]),
axis=0,
)
cx14[m, :] = np.nanmean(
w[0].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]),
axis=0,
)
cx23[m, :] = np.nanmean(
w[1].data[lb:ub, :] * np.conj(w[2].data[lb:ub, :]),
axis=0,
)
cx24[m, :] = np.nanmean(
w[1].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]),
axis=0,
)
cx34[m, :] = np.nanmean(
w[2].data[lb:ub, :] * np.conj(w[3].data[lb:ub, :]),
axis=0,
)
power_avg[m, :] = np.nanmean(fk_power[lb:ub, :], axis=0)
# Compute phase differences between each spacecraft pair
th12 = np.arctan2(np.imag(cx12), np.real(cx12))
th13 = np.arctan2(np.imag(cx13), np.real(cx13))
th14 = np.arctan2(np.imag(cx14), np.real(cx14))
th23 = np.arctan2(np.imag(cx23), np.real(cx23))
th24 = np.arctan2(np.imag(cx24), np.real(cx24))
th34 = np.arctan2(np.imag(cx34), np.real(cx34))
w_mat = 2 * np.pi * np.tile(w[0].frequency.data, (n + 1, 1))
# Convert phase difference to time delay
dt12 = th12 / w_mat
dt13 = th13 / w_mat
dt14 = th14 / w_mat
dt23 = th23 / w_mat
dt24 = th24 / w_mat
dt34 = th34 / w_mat
# Weighted averaged time delay using all spacecraft pairs
dt2 = (
0.5 * dt12
+ 0.2 * (dt13 - dt23)
+ 0.2 * (dt14 - dt24)
+ 0.1 * (dt14 - dt34 - dt23)
)
dt3 = (
0.5 * dt13
+ 0.2 * (dt12 + dt23)
+ 0.2 * (dt14 - dt34)
+ 0.1 * (dt12 + dt24 - dt34)
)
dt4 = (
0.5 * dt14
+ 0.2 * (dt12 + dt24)
+ 0.2 * (dt13 + dt34)
+ 0.1 * (dt12 + dt23 + dt34)
)
# Compute phase speeds
r = [r[i].data for i in range(4)]
k_x, k_y, k_z = [np.zeros((n + 1, num_f)) for _ in range(3)]
for ii in range(n + 1):
dr = np.array(
[
r[1][ii, :] - r[0][ii, :],
r[2][ii, :] - r[0][ii, :],
r[3][ii, :] - r[0][ii, :],
],
)
for jj in range(num_f):
m = linalg.solve(dr, np.array([dt2[ii, jj], dt3[ii, jj], dt4[ii, jj]]))
k_x[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[0]
k_y[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[1]
k_z[ii, jj] = 2 * np.pi * w[0].frequency[jj].data * m[2]
k_x, k_y, k_z = [k / 1e3 for k in [k_x, k_y, k_z]]
k_mag = np.linalg.norm(np.array([k_x, k_y, k_z]), axis=0)
# Compute the parallel and perpendicular wave numbers using the background magnetic
# field averaged over the four spacecraft. This is used to
b_avg_x_mat = np.tile(b_avg.data[:, 0], (num_f, 1)).T
b_avg_y_mat = np.tile(b_avg.data[:, 1], (num_f, 1)).T
b_avg_z_mat = np.tile(b_avg.data[:, 2], (num_f, 1)).T
b_avg_abs = np.linalg.norm(b_avg, axis=1)
b_avg_abs_mat = np.tile(b_avg_abs, (num_f, 1)).T
k_para = (k_x * b_avg_x_mat + k_y * b_avg_y_mat + k_z * b_avg_z_mat) / b_avg_abs_mat
k_perp = np.sqrt(k_mag**2 - k_para**2)
# Determine the maximum and minimum wave numbers
k_max = np.max(k_mag) * 1.1
k_min = -k_max
k_vec = np.linspace(-k_max, k_max, num_k)
k_mag_vec = np.linspace(0, k_max, num_k)
dk_mag = k_max / num_k
dk = 2 * k_max / num_k
# Sort power into frequency and wave vector
logging.info("Computing power versus (kx,f); (ky,f), (kz,f), (k,f)")
power_k_x_f, power_k_y_f, power_k_z_f = [np.zeros((num_f, num_k)) for _ in range(3)]
power_k_mag_f = np.zeros((num_f, num_k))
for mm in range(n + 1):
for nn in range(num_f):
k_x_number = int(np.floor((k_x[mm, nn] - k_min) / dk))
k_y_number = int(np.floor((k_y[mm, nn] - k_min) / dk))
k_z_number = int(np.floor((k_z[mm, nn] - k_min) / dk))
k_number = int(np.floor(k_mag[mm, nn] / dk_mag))
power_k_x_f[nn, k_x_number] += power_avg[mm, nn]
power_k_y_f[nn, k_y_number] += power_avg[mm, nn]
power_k_z_f[nn, k_z_number] += power_avg[mm, nn]
power_k_mag_f[nn, k_number] += power_avg[mm, nn]
# Normalize power to maximum value for plotting
power_k_x_f /= np.max(power_k_x_f)
power_k_y_f /= np.max(power_k_y_f)
power_k_z_f /= np.max(power_k_z_f)
power_k_mag_f /= np.max(power_k_mag_f)
frequencies = w[0].frequency.data
idx_f = np.arange(num_f)
if f_range is not None:
idx_min_freq = bisect.bisect_left(frequencies, np.min(f_range))
idx_max_freq = bisect.bisect_left(frequencies, np.max(f_range))
idx_f = idx_f[idx_min_freq:idx_max_freq]
# Sort power into wave vector space for k_x, k_y; k_x, k_z; k_y, k_z
logging.info("Computing power versus (kx,ky); (kx,kz); (ky,kz)")
power_k_x_k_y = np.zeros((num_k, num_k))
power_k_x_k_z = np.zeros((num_k, num_k))
power_k_y_k_z = np.zeros((num_k, num_k))
for mm in range(n + 1):
for nn in idx_f:
# Find the position of the power in the
# k_x, k_y; k_x, k_z; k_y, k_z space
k_x_number = int(np.floor((k_x[mm, nn] - k_min) / dk))
k_y_number = int(np.floor((k_y[mm, nn] - k_min) / dk))
k_z_number = int(np.floor((k_z[mm, nn] - k_min) / dk))
# Add the power to the corresponding position in the
# k_x, k_y; k_x, k_z; k_y, k_z space
power_k_x_k_y[k_y_number, k_x_number] += power_avg[mm, nn]
power_k_x_k_z[k_z_number, k_x_number] += power_avg[mm, nn]
power_k_y_k_z[k_z_number, k_y_number] += power_avg[mm, nn]
# Normalize power to maximum value for plotting
power_k_x_k_y /= np.max(power_k_x_k_y)
power_k_x_k_z /= np.max(power_k_x_k_z)
power_k_y_k_z /= np.max(power_k_y_k_z)
# Sort power into wave vector space for k_perp, k_para
logging.info("Computing power versus kperp,kpara")
power_k_perp_k_para = np.zeros((num_k, num_k))
for mm in range(n + 1):
for nn in idx_f:
# Find the position of the power in the k_para, k_perp space
k_para_number = int(np.floor((k_para[mm, nn] - k_min) / dk))
k_perp_number = int(np.floor((k_perp[mm, nn]) / dk_mag))
# Add the power to the corresponding position in the k_para, k_perp space
power_k_perp_k_para[k_para_number, k_perp_number] += power_avg[mm, nn]
# Normalize power to maximum value for plotting
power_k_perp_k_para /= np.max(power_k_perp_k_para)
# Set zero power to NaN for plotting purposes
power_k_x_f[power_k_x_f == 0] = np.nan
power_k_y_f[power_k_y_f == 0] = np.nan
power_k_z_f[power_k_z_f == 0] = np.nan
power_k_mag_f[power_k_mag_f == 0] = np.nan
power_k_x_k_y[power_k_x_k_y == 0] = np.nan
power_k_x_k_z[power_k_x_k_z == 0] = np.nan
power_k_y_k_z[power_k_y_k_z == 0] = np.nan
power_k_perp_k_para[power_k_perp_k_para == 0] = np.nan
# Find the wave numbers and frequencies weighted by power
# kmag, f
k_powers = np.nansum(power_k_mag_f[idx_f, :], axis=0)
f_powers = np.nansum(power_k_mag_f[idx_f, :], axis=1)
f_avg = np.nansum(frequencies[idx_f] * f_powers) / np.nansum(f_powers)
f_std = np.sqrt(
np.nansum((frequencies[idx_f] - f_avg) ** 2 * f_powers) / np.nansum(f_powers)
)
k_avg = np.nansum(k_mag_vec * k_powers) / np.nansum(k_powers)
k_std = np.sqrt(
np.nansum((k_mag_vec - k_avg) ** 2 * k_powers) / np.nansum(k_powers)
)
# kpara, kperp
kpara_powers = np.nansum(power_k_perp_k_para, axis=1)
kperp_powers = np.nansum(power_k_perp_k_para, axis=0)
kpar_avg = np.nansum(k_vec * kpara_powers) / np.nansum(kpara_powers)
kpar_std = np.sqrt(
np.nansum((k_vec - kpar_avg) ** 2 * kpara_powers) / np.nansum(kpara_powers)
)
kperp_avg = np.nansum(k_mag_vec * kperp_powers) / np.nansum(kperp_powers)
kperp_std = np.sqrt(
np.nansum((k_mag_vec - kperp_avg) ** 2 * kperp_powers) / np.nansum(kperp_powers)
)
# kx, ky, kz
kx_powers = np.nansum(power_k_x_k_y, axis=0)
ky_powers = np.nansum(power_k_x_k_y, axis=1)
kz_powers = np.nansum(power_k_x_k_z, axis=1)
kx_avg = np.nansum(k_vec * kx_powers) / np.nansum(kx_powers)
kx_std = np.sqrt(
np.nansum((k_vec - kx_avg) ** 2 * kx_powers) / np.nansum(kx_powers)
)
ky_avg = np.nansum(k_vec * ky_powers) / np.nansum(ky_powers)
ky_std = np.sqrt(
np.nansum((k_vec - ky_avg) ** 2 * ky_powers) / np.nansum(ky_powers)
)
kz_avg = np.nansum(k_vec * kz_powers) / np.nansum(kz_powers)
kz_std = np.sqrt(
np.nansum((k_vec - kz_avg) ** 2 * kz_powers) / np.nansum(kz_powers)
)
attrs = {
"f_avg": f_avg,
"f_std": f_std,
"k_mag_avg": k_avg,
"k_mag_std": k_std,
"k_para_avg": kpar_avg,
"k_para_std": kpar_std,
"k_perp_avg": kperp_avg,
"k_perp_std": kperp_std,
"k_x_avg": kx_avg,
"k_x_std": kx_std,
"k_y_avg": ky_avg,
"k_y_std": ky_std,
"k_z_avg": kz_avg,
"k_z_std": kz_std,
}
out_dict = {
"k_x_f": (["k_x", "f"], power_k_x_f.T),
"k_y_f": (["k_x", "f"], power_k_y_f.T),
"k_z_f": (["k_x", "f"], power_k_z_f.T),
"k_mag_f": (["k_mag", "f"], power_k_mag_f.T),
"k_x_k_y": (["k_x", "k_y"], power_k_x_k_y.T),
"k_x_k_z": (["k_x", "k_z"], power_k_x_k_z.T),
"k_y_k_z": (["k_y", "k_z"], power_k_y_k_z.T),
"k_perp_k_para": (["k_perp", "k_para"], power_k_perp_k_para.T),
"k_x": k_vec,
"k_y": k_vec,
"k_z": k_vec,
"k_mag": k_mag_vec,
"k_perp": k_mag_vec,
"k_para": k_vec,
"f": frequencies,
}
out = xr.Dataset(out_dict, attrs=attrs)
return out