Source code for pyrfu.pyrf.movmean
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Built-in imports
from typing import Any, Optional
# 3rd party imports
import numpy as np
import xarray as xr
from numpy.typing import NDArray
from xarray.core.dataarray import DataArray
__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2024"
__license__ = "MIT"
__version__ = "2.4.13"
__status__ = "Prototype"
[docs]def movmean(inp: DataArray, window_size: Optional[int] = None) -> DataArray:
r"""Computes running average of the inp over npts points.
Parameters
----------
inp : DataArray
Time series of the input variable.
window_size : int, Optional
Number of points to average over. Default is a 4-point running average.
Returns
-------
DataArray
Time series of the input variable averaged over npts points.
Raises
------
TypeError
If inp is not a DataArray.
ValueError
If window_size is smaller than 2 or larger than the length of the data.
Notes
-----
Works also with 3D skymap distribution.
Examples
--------
>>> from pyrfu import mms, pyrf
Time interval
>>> tint = ["2019-09-14T07:54:00.000","2019-09-14T08:11:00.000"]
Spacecraft index
>>> mms_id = 1
Load ion pressure tensor
>>> p_xyz_i = mms.get_data("Pi_gse_fpi_brst_l2", tint, mms_id)
Running average the pressure tensor over 10s
>>> fs = pyrf.calc_fs(p_xyz_i)
>>> p_xyz_i = pyrf.movmean(p_xyz_i, int(10 * fs))
"""
# Checks if input is a DataArray
if not isinstance(inp, xr.DataArray):
raise TypeError("Input must be a xarray.DataArray")
# Gets input data and time
time: NDArray[np.datetime64] = inp.time.data
inp_data: NDArray[np.float32] = inp.data
# Checks if window_size is defined
if window_size is None:
window_size = 2
elif window_size < 2 or window_size > len(time):
raise ValueError(
"Window size must be larger than 2 and smaller than the length of the data."
)
# Checks if window_size is odd. If not, makes it odd.
if window_size % 2:
window_size -= 1
# Preallocates output
out_dat: NDArray[np.float32] = np.empty(
(len(time) - window_size, *inp_data.shape[1:]), dtype=np.float32
)
# Computes moving average
cum_sum: NDArray[np.float32] = np.cumsum(inp_data, axis=0)
out_dat[...] = (
cum_sum[window_size:, ...] - cum_sum[:-window_size, ...]
) / window_size
# Gets coordinates
coords: list[NDArray[Any]] = [
time[int(window_size / 2) : -int(window_size / 2)],
*[inp.coords[k].data for k in inp.dims[1:]],
]
# Output in DataArray type
out: DataArray = xr.DataArray(
out_dat, coords=coords, dims=inp.dims, attrs=inp.attrs
)
return out