Source code for pyrfu.mms.get_ts

#!/usr/bin/env python
# -*- coding: utf-8 -*-

# Built-in import
import re
import warnings

# 3rd party imports
import numpy as np
import xarray as xr
from pycdfpp import DataType, load, to_datetime64

# Local imports
from ..pyrf.datetime642iso8601 import datetime642iso8601
from ..pyrf.iso86012datetime64 import iso86012datetime64
from ..pyrf.time_clip import time_clip
from .get_variable import _pycdfpp_attributes_to_dict

__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"


def _shift_epochs(file, epoch):
    r"""Shift times for particles."""

    epoch_shifted = epoch["data"].copy()

    try:
        delta_minus_key = epoch["attrs"]["DELTA_MINUS_VAR"]
        delta_minus_var = {
            "data": file[delta_minus_key].values,
            "attrs": _pycdfpp_attributes_to_dict(file[delta_minus_key].attributes),
        }
        delta_plus_key = epoch["attrs"]["DELTA_PLUS_VAR"]
        delta_plus_var = {
            "data": file[delta_plus_key].values,
            "attrs": _pycdfpp_attributes_to_dict(file[delta_plus_key].attributes),
        }

        delta_vars = [delta_minus_var, delta_plus_var]
        flags_vars = [1e3, 1e3]  # Time scaling conversion flags

        for i, delta_var in enumerate(delta_vars):
            if isinstance(delta_var["attrs"], dict) and "UNITS" in delta_var["attrs"]:
                if delta_var["attrs"]["UNITS"].lower() == "s":
                    flags_vars[i] = 1e3
                elif delta_var["attrs"]["UNITS"].lower() == "ms":
                    flags_vars[i] = 1e0
                else:
                    message = " units are not clear, assume s"
                    warnings.warn(message)
            else:
                message = "Epoch_plus_var/Epoch_minus_var units are not clear, assume s"
                warnings.warn(message)

        flag_minus, flag_plus = flags_vars

        t_offset = (
            delta_plus_var["data"] * flag_plus - delta_minus_var["data"] * flag_minus
        )

        t_offset = (np.round(t_offset, 1) * 1e6 / 2).astype("timedelta64[ns]")
        t_diff = (
            delta_plus_var["data"] * flag_plus - delta_minus_var["data"] * flag_minus
        )
        t_diff = (np.round(t_diff, 1) * 1e6 / 2).astype("timedelta64[ns]")
        t_diff_data = np.median(np.diff(epoch["data"])) / 2

        if t_diff_data != np.mean(t_diff):
            t_offset = t_diff_data

        epoch_shifted += t_offset

        return {"data": epoch_shifted, "attrs": epoch["attrs"]}

    except KeyError:
        return {"data": epoch_shifted, "attrs": epoch["attrs"]}


def _get_epochs(file, cdf_name):
    r"""Get epochs form cdf and shift if needed."""

    depend0_key = file[cdf_name].attributes["DEPEND_0"][0]

    out = {
        "data": file[depend0_key].values,
    }

    if file[depend0_key].type == DataType.CDF_TIME_TT2000:
        try:
            out["data"] = to_datetime64(out["data"])

            # Get epoch attributes
            out["attrs"] = _pycdfpp_attributes_to_dict(file[depend0_key].attributes)

            # Shift times if particle data
            is_part = re.search(
                "^mms[1-4]_d[ei]s_",
                cdf_name,
            )  # Is it FPI data?
            is_part = is_part or re.search(
                "^mms[1-4]_hpca_",
                cdf_name,
            )  # Is it HPCA data?

            if is_part:
                out = _shift_epochs(file, out)

        except TypeError:
            pass

    return out


def _get_depend_attributes(file, depend_key):
    attributes = _pycdfpp_attributes_to_dict(file[depend_key].attributes)

    # Remove spaces in label
    try:
        attributes["LABLAXIS"] = attributes["LABLAXIS"].replace(" ", "_")

        if attributes["LABLAXIS"] == "Diffential_energy_channels":
            attributes["LABLAXIS"] = "Differential_energy_channels"

    except (KeyError, AttributeError):
        attributes["LABLAXIS"] = "comp"

    return attributes


def _get_depend(file, cdf_name, dep_num=1):
    out = {}

    if f"DEPEND_{dep_num:d}" in file[cdf_name].attributes:
        depend_key = file[cdf_name].attributes[f"DEPEND_{dep_num:d}"][0]
    elif f"REPRESENTATION_{dep_num:d}" in file[cdf_name].attributes:
        depend_key = file[cdf_name].attributes[f"REPRESENTATION_{dep_num:d}"][0]
    else:
        raise KeyError(f"no DEPEND_{dep_num:d}/REPRESENTATION_{dep_num:d} attributes")

    if depend_key == "x,y,z":
        out["data"] = np.array(depend_key.split(","))

        out["attrs"] = {"LABLAXIS": "comp"}
    else:
        out["data"] = file[depend_key].values

        if len(out["data"]) == 1:
            out["data"] = out["data"][0]

        if len(out["data"]) == 4 and all(
            out["data"].astype(str) == ["x", "y", "z", "r"]
        ):
            out["data"] = out["data"].astype(str)[:-1]

        elif out["data"].ndim == 2:
            if len(out["data"].flatten()) == 3:
                out["data"] = out["data"].flatten()
            else:
                try:
                    out["data"] = out["data"][0, :]
                except IndexError:
                    pass

        out["attrs"] = _get_depend_attributes(file, depend_key)

    return out


[docs]def get_ts(file_path, cdf_name, tint: list = None): r"""Reads field named cdf_name in file and convert to time series. Parameters ---------- file_path : str Path of the cdf file. cdf_name : str Name of the target variable in the cdf file. tint : list of str, Optional Time interval. Returns ------- out : xarray.DataArray Time series of the target variable in the selected time interval. """ # Check time interval type # Check time interval if tint is None: tint = ["1995-10-06T18:50:00.000000000", "2200-10-06T18:50:00.000000000"] elif isinstance(tint, (np.ndarray, list)): if isinstance(tint[0], np.datetime64): tint = datetime642iso8601(np.array(tint)) elif isinstance(tint[0], str): tint = iso86012datetime64( np.array(tint), ) # to make sure it is ISO8601 ok!! tint = datetime642iso8601(np.array(tint)) else: raise TypeError("Values must be in datetime64, or str!!") else: raise TypeError("tint must be array_like!!") out_dict = {} time, depend_1, depend_2, depend_3 = [{}, {}, {}, {}] # Load CDF file file = load(file_path) var_attrs = _pycdfpp_attributes_to_dict(file[cdf_name].attributes) glb_attrs = _pycdfpp_attributes_to_dict(file.attributes) out_dict["attrs"] = {"GLOBAL": glb_attrs, **var_attrs} out_dict["attrs"] = {k: out_dict["attrs"][k] for k in sorted(out_dict["attrs"])} assert "DEPEND_0" in var_attrs and "epoch" in var_attrs["DEPEND_0"].lower() time = _get_epochs(file, cdf_name) if time["data"] is None: return None if "DEPEND_1" in var_attrs or "REPRESENTATION_1" in var_attrs: depend_1 = _get_depend(file, cdf_name, 1) elif "afg" in cdf_name or "dfg" in cdf_name: depend_1 = { "data": ["x", "y", "z"], "attrs": {"LABLAXIS": "comp"}, } if "DEPEND_2" in var_attrs or "REPRESENTATION_2" in var_attrs: depend_2 = _get_depend(file, cdf_name, 2) if depend_2["attrs"]["LABLAXIS"] == depend_1["attrs"]["LABLAXIS"]: depend_1["attrs"]["LABLAXIS"] = "rcomp" depend_2["attrs"]["LABLAXIS"] = "ccomp" if "DEPEND_3" in var_attrs or "REPRESENTATION_3" in var_attrs: if "REPRESENTATION_3" in var_attrs: assert out_dict["attrs"]["REPRESENTATION_3"] != "x,y,z" depend_3 = _get_depend(file, cdf_name, 3) if depend_3["attrs"]["LABLAXIS"] == depend_2["attrs"]["LABLAXIS"]: depend_2["attrs"]["LABLAXIS"] = "rcomp" depend_3["attrs"]["LABLAXIS"] = "ccomp" if "sector_mask" in cdf_name: cdf_name_mask = cdf_name.replace("sector_mask", "intensity") depend_1_key = file.varattsget(cdf_name_mask)["DEPEND_1"] depend_1["data"] = file[depend_1_key].values depend_1["attrs"] = _pycdfpp_attributes_to_dict(file[depend_1_key].attributes) depend_1["attrs"]["LABLAXIS"] = depend_1["attrs"]["LABLAXIS"].replace(" ", "_") if "edp_dce_sensor" in cdf_name: depend_1["data"] = ["x", "y", "z"] depend_1["attrs"] = {"LABLAXIS": "comp"} out_dict["data"] = file[cdf_name].values if out_dict["data"].ndim == 2 and out_dict["data"].shape[1] == 4: out_dict["data"] = out_dict["data"][:, :-1] # depend_1["data"] = depend_1["data"][:-1] if out_dict["data"].ndim == 2 and not depend_1: depend_1["data"] = np.arange(out_dict["data"].shape[1]) depend_1["attrs"] = {"LABLAXIS": "idx"} if time and not depend_1 and not depend_2 and not depend_3: dims = ["time"] coords_data = [time["data"]] coords_attrs = [time["attrs"]] elif time and depend_1 and not depend_2 and not depend_3: dims = ["time", depend_1["attrs"]["LABLAXIS"]] coords_data = [time["data"], depend_1["data"]] coords_attrs = [time["attrs"], depend_1["attrs"]] elif time and depend_1 and depend_2 and not depend_3: dims = [ "time", depend_1["attrs"]["LABLAXIS"], depend_2["attrs"]["LABLAXIS"], ] coords_data = [time["data"], depend_1["data"], depend_2["data"]] coords_attrs = [time["attrs"], depend_1["attrs"], depend_2["attrs"]] elif time and depend_1 and depend_2 and depend_3: dims = [ "time", depend_1["attrs"]["LABLAXIS"], depend_2["attrs"]["LABLAXIS"], depend_3["attrs"]["LABLAXIS"], ] coords_data = [ time["data"], depend_1["data"], depend_2["data"], depend_3["data"], ] coords_attrs = [ time["attrs"], depend_1["attrs"], depend_2["attrs"], depend_3["attrs"], ] else: raise NotImplementedError out = xr.DataArray( out_dict["data"], coords=coords_data, dims=dims, attrs=out_dict["attrs"], ) for dim, coord_attrs in zip(dims, coords_attrs): # Sort attributes and set to coordinates attribute out[dim].attrs = {k: coord_attrs[k] for k in sorted(coord_attrs)} # Time clip to original time interval out = time_clip(out, tint) return out