Source code for pyrfu.pyrf.corr_deriv

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# 3rd party imports
import numpy as np

# Local imports
from .find_closest import find_closest

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"


[docs]def corr_deriv(inp0, inp1, flag: bool = False): r"""Correlate the derivatives of two time series Parameters ---------- inp0 : xarray.DataArray Time series of the first to variable to correlate with. inp1 : xarray.DataArray Time series of the second to variable to correlate with. flag : bool, Optional Flag if False (default) returns time instants of common highest first and second derivatives. If True returns time instants of common highest first derivative and zeros crossings. Returns ------- t1_d, t2_d : ndarray Time instants of common highest first derivatives. t1_dd, t2_dd : ndarray Time instants of common highest second derivatives or zero crossings. """ # 1st derivative tx1 = inp0.time.data.astype(np.int64) * 1e-9 inp0 = inp0.data dtx1 = tx1[:-1] + 0.5 * np.diff(tx1) dx1 = np.diff(inp0) tx2 = inp1.time.data.astype(np.int64) * 1e-9 inp1 = inp1.data dtx2 = tx2[:-1] + 0.5 * np.diff(tx2) dx2 = np.diff(inp1) ind_zeros1 = np.where(np.sign(dx1[:-1] * dx1[1:]) < 0)[0] if ind_zeros1 == 0: ind_zeros1 = ind_zeros1[1:] ind_zeros2 = np.where(np.sign(dx2[:-1] * dx2[1:]) < 0)[0] if ind_zeros2 == 0: ind_zeros2 = ind_zeros2[1:] ind_zeros1_p = np.where(dx1[ind_zeros1 - 1] - dx1[ind_zeros1] > 0)[0] ind_zeros2_p = np.where(dx2[ind_zeros2 - 1] - dx2[ind_zeros2] > 0)[0] ind_zeros1_m = np.where(dx1[ind_zeros1 - 1] - dx1[ind_zeros1] < 0)[0] ind_zeros2_m = np.where(dx2[ind_zeros2 - 1] - dx2[ind_zeros2] < 0)[0] ind1_p = ind_zeros1[ind_zeros1_p] ind1_m = ind_zeros1[ind_zeros1_m] t_zeros1_p = dtx1[ind1_p] + (dtx1[ind1_p + 1] - dtx1[ind1_p]) / ( 1 + np.abs(dx1[ind1_p + 1]) / np.abs(dx1[ind1_p]) ) t_zeros1_m = dtx1[ind1_m] + (dtx1[ind1_m + 1] - dtx1[ind1_m]) / ( 1 + np.abs(dx1[ind1_m + 1]) / np.abs(dx1[ind1_m]) ) ind2_p = ind_zeros2[ind_zeros2_p] ind2_m = ind_zeros2[ind_zeros2_m] t_zeros2_p = dtx2[ind2_p] + (dtx2[ind2_p + 1] - dtx2[ind2_p]) / ( 1 + np.abs(dx2[ind2_p + 1]) / np.abs(dx2[ind2_p]) ) t_zeros2_m = dtx2[ind2_m] + (dtx2[ind2_m + 1] - dtx2[ind2_m]) / ( 1 + np.abs(dx2[ind2_m + 1]) / np.abs(dx2[ind2_m]) ) # Remove repeating points t_zeros1_p = np.delete(t_zeros1_p, np.where(np.diff(t_zeros1_p) == 0)[0]) t_zeros2_p = np.delete(t_zeros2_p, np.where(np.diff(t_zeros2_p) == 0)[0]) # Define identical pairs of two time axis t1_d_p, t2_d_p, _, _ = find_closest(t_zeros1_p, t_zeros2_p) t1_d_m, t2_d_m, _, _ = find_closest(t_zeros1_m, t_zeros2_m) t1_d = np.vstack([t1_d_p, t1_d_m]) t1_d = t1_d[t1_d[:, 0].argsort(), 0] t2_d = np.vstack([t2_d_p, t2_d_m]) t2_d = t2_d[t2_d[:, 0].argsort(), 0] if flag: # zero crossings ind_zeros1 = np.where(np.sign(inp0[:-1] * inp0[1:]) < 0)[0] ind_zeros2 = np.where(np.sign(inp1[:-1] * inp1[1:]) < 0)[0] ind_zeros1 = np.delete(ind_zeros1, np.where(ind_zeros1 == 1)[0]) ind_zeros2 = np.delete(ind_zeros2, np.where(ind_zeros2 == 1)[0]) ind_zeros1_p = np.where(inp0[ind_zeros1 - 1] - inp0[ind_zeros1] > 0)[0] ind_zeros2_p = np.where(inp1[ind_zeros2 - 1] - inp1[ind_zeros2] > 0)[0] ind_zeros1_m = np.where(inp0[ind_zeros1 - 1] - inp0[ind_zeros1] < 0)[0] ind_zeros2_m = np.where(inp1[ind_zeros2 - 1] - inp1[ind_zeros2] < 0)[0] ind1_p = ind_zeros1[ind_zeros1_p] ind1_m = ind_zeros1[ind_zeros1_m] t_zeros1_p = tx1[ind1_p] + (tx1[ind1_p + 1] - tx1[ind1_p]) / ( 1 + np.abs(inp0[ind1_p + 1]) / np.abs(inp0[ind1_p]) ) t_zeros1_m = tx1[ind1_m] + (tx1[ind1_m + 1] - tx1[ind1_m]) / ( 1 + np.abs(inp0[ind1_m + 1]) / np.abs(inp0[ind1_m]) ) ind2_p = ind_zeros2[ind_zeros2_p] ind2_m = ind_zeros2[ind_zeros2_m] t_zeros2_p = tx2[ind2_p] + (tx2[ind2_p + 1] - tx2[ind2_p]) / ( 1 + np.abs(inp1[ind2_p + 1]) / np.abs(inp1[ind2_p]) ) t_zeros2_m = tx2[ind2_m] + (tx2[ind2_m + 1] - tx2[ind2_m]) / ( 1 + np.abs(inp1[ind2_m + 1]) / np.abs(inp1[ind2_m]) ) else: # 2nd derivative dd_tx1 = dtx1[:-1] + 0.5 * np.diff(dtx1) ddx1 = np.diff(dx1) dd_tx2 = dtx2[:-1] + 0.5 * np.diff(dtx2) ddx2 = np.diff(dx2) ind_zeros1 = np.where(np.sign(ddx1[:-1] * ddx1[1:]) < 0)[0] ind_zeros2 = np.where(np.sign(ddx2[:-1] * ddx2[1:]) < 0)[0] ind_zeros1 = np.delete(ind_zeros1, np.where(ind_zeros1 == 1)[0]) ind_zeros2 = np.delete(ind_zeros2, np.where(ind_zeros2 == 1)[0]) ind_zeros1_p = np.where(ddx1[ind_zeros1 - 1] - ddx1[ind_zeros1] > 0)[0] ind_zeros2_p = np.where(ddx2[ind_zeros2 - 1] - ddx2[ind_zeros2] > 0)[0] ind_zeros1_m = np.where(ddx1[ind_zeros1 - 1] - ddx1[ind_zeros1] < 0)[0] ind_zeros2_m = np.where(ddx2[ind_zeros2 - 1] - ddx2[ind_zeros2] < 0)[0] ind1_p = ind_zeros1[ind_zeros1_p] ind1_m = ind_zeros1[ind_zeros1_m] t_zeros1_p = dd_tx1[ind1_p] + (dd_tx1[ind1_p + 1] - dd_tx1[ind1_p]) / ( 1 + np.abs(ddx1[ind1_p + 1]) / np.abs(ddx1[ind1_p]) ) t_zeros1_m = dd_tx1[ind1_m] + (dd_tx1[ind1_m + 1] - dd_tx1[ind1_m]) / ( 1 + np.abs(ddx1[ind1_m + 1]) / np.abs(ddx1[ind1_m]) ) ind2_p = ind_zeros2[ind_zeros2_p] ind2_m = ind_zeros2[ind_zeros2_m] t_zeros2_p = dd_tx2[ind2_p] + (dd_tx2[ind2_p + 1] - dd_tx2[ind2_p]) / ( 1 + np.abs(ddx2[ind2_p + 1]) / np.abs(ddx2[ind2_p]) ) t_zeros2_m = dd_tx2[ind2_m] + (dd_tx2[ind2_m + 1] - dd_tx2[ind2_m]) / ( 1 + np.abs(ddx2[ind2_m + 1]) / np.abs(ddx2[ind2_m]) ) # Define identical pairs of two time axis t1_dd_p, t2_dd_p, _, _ = find_closest(t_zeros1_p, t_zeros2_p) t1_dd_m, t2_dd_m, _, _ = find_closest(t_zeros1_m, t_zeros2_m) t1_dd = np.vstack([t1_dd_p, t1_dd_m]) t1_dd = t1_dd[t1_dd[:, 0].argsort(), 0] t2_dd = np.vstack([t2_dd_p, t2_dd_m]) t2_dd = t2_dd[t2_dd[:, 0].argsort(), 0] return t1_d, t2_d, t1_dd, t2_dd