#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Built-in imports
import bisect
import logging
# 3rd party imports
import numpy as np
import xarray as xr
from scipy import interpolate
__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"
logging.captureWarnings(True)
logging.basicConfig(
format="[%(asctime)s] %(levelname)s: %(message)s",
datefmt="%d-%b-%y %H:%M:%S",
level=logging.INFO,
)
def _guess_sampling_frequency(ref_time):
r"""Compute sampling frequency of the time line."""
n_data = len(ref_time)
sfy1 = 1 / (ref_time[1] - ref_time[0])
sfy = None
not_found = True
if n_data == 2:
sfy = sfy1
not_found = False
cur, max_try = [2, 10]
while not_found and cur < n_data and cur - 3 < max_try:
sfy = 1 / (ref_time[cur] - ref_time[cur - 1])
if np.absolute(sfy - sfy1) < sfy * 0.001:
not_found = False
sfy = (sfy + sfy1) / 2
break
sfy = sfy1
cur += 1
if not_found:
raise RuntimeError(f"Cannot guess sampling frequency. Tried {max_try:d} times")
return sfy
def _average(inp_time, inp_data, ref_time, thresh, dt2):
r"""Resample inp_data to timeline of ref_time, using half-window of dt2.
Points above std*tresh are excluded. thresh=0 turns off this option.
"""
try:
out_data = np.zeros([len(ref_time), *inp_data.shape[1:]])
except IndexError:
inp_data = inp_data[:, None]
out_data = np.zeros((len(ref_time), inp_data.shape[1]))
for i, ref_t in enumerate(ref_time):
idx_l = bisect.bisect_left(inp_time, ref_t - dt2)
idx_r = bisect.bisect_right(inp_time, ref_t + dt2)
idx = np.arange(idx_l, idx_r)
if idx.size == 0:
out_data[i, ...] = np.nan
else:
if thresh:
std_ = np.std(inp_data[idx, ...], axis=0)
mean_ = np.mean(inp_data[idx, ...], axis=0)
assert any(np.isnan(std_))
for j, stdd in enumerate(std_):
if not np.isnan(stdd):
idx_r = bisect.bisect_right(
inp_data[idx, j + 1] - mean_[j],
thresh * stdd,
)
if idx_r:
out_data[i, j + 1] = np.mean(
inp_data[idx[idx_r], j + 1],
axis=0,
)
else:
out_data[i, j + 1] = np.nan
else:
out_data[i, ...] = np.nan
else:
out_data[i, ...] = np.mean(inp_data[idx, ...], axis=0)
if out_data.ndim > 1 and out_data.shape[1] == 1:
out_data = out_data[:, 0]
return out_data
def _resample_dataarray(inp, ref, method, f_s, window, thresh):
r"""Resample for time series (xarray.DataArray)"""
flag_do = "check"
if method:
flag_do = "interpolation"
if f_s is not None:
sfy = f_s
elif window is not None:
sfy = 1 / window
else:
sfy = None
inp_time = inp.time.data.view("i8") * 1e-9
ref_time = ref.time.data.view("i8") * 1e-9
if flag_do == "check":
if len(ref_time) > 1:
if not sfy:
sfy = _guess_sampling_frequency(ref_time)
if len(inp_time) / (inp_time[-1] - inp_time[0]) > 2 * sfy:
flag_do = "average"
logging.info("Using averages in resample")
else:
flag_do = "interpolation"
else:
flag_do = "interpolation"
assert flag_do in ["average", "interpolation"]
if flag_do == "average":
assert not method, "cannot mix interpolation and averaging flags"
if not sfy:
sfy = _guess_sampling_frequency(ref_time)
out_data = _average(inp_time, inp.data, ref_time, thresh, 0.5 / sfy)
else:
if not method:
method = "linear"
# If time series agree, no interpolation is necessary.
if len(inp_time) == len(ref_time) and all(inp_time == ref_time):
out_data = inp.data.copy()
coord = [ref.coords["time"].data]
if len(inp.coords) > 1:
for k in list(inp.dims)[1:]:
coord.append(inp.coords[k].data)
out = xr.DataArray(
out_data,
coords=coord,
dims=inp.dims,
attrs=inp.attrs,
)
return out
tck = interpolate.interp1d(
inp_time,
inp.data,
kind=method,
axis=0,
fill_value="extrapolate",
)
out_data = tck(ref_time)
coord = [ref.coords["time"]]
if len(inp.coords) > 1:
for k in list(inp.dims)[1:]:
coord.append(inp.coords[k].data)
out = xr.DataArray(out_data, coords=coord, dims=inp.dims, attrs=inp.attrs)
return out
def _resample_dataset(inp, ref, **kwargs):
r"""Resample for VDFs (xarray.Dataset)"""
# Find time dependent zVariables and resample
tdepnd_zvars = list(filter(lambda x: "time" in inp[x].dims, inp))
out_dict = {k: _resample_dataarray(inp[k], ref, **kwargs) for k in tdepnd_zvars}
# Complete the dictionary with non-time dependent zVaraiables
ndepnd_zvars = list(filter(lambda x: x not in tdepnd_zvars, inp))
out_dict = {**out_dict, **{k: inp[k] for k in ndepnd_zvars}}
# Find array_like attributes
arr_attrs = filter(
lambda x: isinstance(inp.attrs[x], np.ndarray),
inp.attrs,
)
arr_attrs = list(arr_attrs)
# Initialize attributes dictionary with non array_like attributes
gen_attrs = filter(lambda x: x not in arr_attrs, inp.attrs)
out_attrs = {k: inp.attrs[k] for k in list(gen_attrs)}
for k in arr_attrs:
attr = inp.attrs[k]
# If array_like attributes have one dimension equal to time length
# assume time dependent. One option would be move the time dependent
# array_like attributes to time series to zVaraibles to avoid
# confusion
if attr.shape[0] == len(inp.time.data):
coords = [np.arange(attr.shape[i + 1]) for i in range(attr.ndim - 1)]
dims = [f"idx{i:d}" for i in range(attr.ndim - 1)]
attr_ts = xr.DataArray(
attr,
coords=[inp.time.data, *coords],
dims=["time", *dims],
)
out_attrs[k] = _resample_dataarray(attr_ts, ref, **kwargs).data
else:
out_attrs[k] = attr
out_attrs = {k: out_attrs[k] for k in sorted(out_attrs)}
# Make output Dataset
out = xr.Dataset(out_dict, attrs=out_attrs)
return out
[docs]def resample(
inp,
ref,
method: str = "",
f_s: float = None,
window: int = None,
thresh: float = 0,
):
r"""Resample inp to the time line of ref. If sampling of X is more than two
times higher than Y, we average X, otherwise we interpolate X.
Parameters
----------
inp : xarray.DataArray or xarray.Dataset
Time series to resample.
ref : xarray.DataArray
Reference time line.
method : str, Optional
Method of interpolation "spline", "linear" etc.
(default "linear") if method is given then interpolate
independent of sampling.
f_s : float, Optional
Sampling frequency of the Y signal, 1/window.
window : int or float or ndarray, Optional
Length of the averaging window, 1/fsample.
thresh : float, Optional
Points above STD*THRESH are disregarded for averaging
Returns
-------
out : xarray.DataArray
Resampled input to the reference time line using the selected method.
Examples
--------
>>> from pyrfu import mms, pyrf
Time interval
>>> tint = ["2015-10-30T05:15:20.000", "2015-10-30T05:16:20.000"]
Spacecraft index
>>> mms_id = 1
Load magnetic field and electric field
>>> b_xyz = mms.get_data("e_gse_fgm_srvy_l2", tint, mms_id)
>>> e_xyz = mms.get_data("e_gse_edp_fast_l2", tint, mms_id)
Resample magnetic field to electric field sampling
>>> b_xyz = pyrf.resample(b_xyz, e_xyz)
"""
message = "Invalid input type. Input must be xarray.DataArary or xarray.Dataset"
assert isinstance(inp, (xr.DataArray, xr.Dataset)), message
options = {"method": method, "f_s": f_s, "window": window, "thresh": thresh}
if isinstance(inp, xr.DataArray):
out = _resample_dataarray(inp, ref, **options)
else:
out = _resample_dataset(inp, ref, **options)
return out