#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Third party imports
import numpy as np
# Built-in imports
import tqdm
import xarray as xr
from scipy.constants import electron_mass, electron_volt, proton_mass, speed_of_light
# Local imports
from ..pyrf.datetime642iso8601 import datetime642iso8601
from ..pyrf.int_sph_dist import int_sph_dist
from ..pyrf.resample import resample
from ..pyrf.time_clip import time_clip
from ..pyrf.ts_scalar import ts_scalar
__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"
[docs]def reduce(vdf, xyz, dim: str = "1d", base: str = "pol", **kwargs):
r"""Reduces (integrates) 3D distribution to 1D (line) or 2D (plane).
Draft do not use!!
Parameters
----------
vdf : xarray.Dataset
3D skymap velocity distribution function.
xyz : xarray.DataArray or numpy.ndarray
Transformation matrix from instrument frame to desired frame.
base : str, Optional
Base for the 2D projection either cartesian 'cart' (default) or
polar 'pol'.
**kwargs
Keyword arguments
Returns
-------
out : xarray.DataArray
Time series of reduced velocity distribution function.
"""
# Make sure the projection dimension amd base are correct
assert dim.lower() in ["1d", "2d", "3d"], "Invalid projection dimension!!"
assert base.lower() in ["cart", "pol"], "Invalid projection base!!"
# Lower energy bound of instrument bins
delta_energy_minu = xr.DataArray(
vdf.attrs["delta_energy_minus"],
coords=[vdf.time.data, vdf.idx0.data],
dims=["time", "idx0"],
)
# Clip the distribution. If no time interval provided use the entire
# time series.
vdf_time = vdf.time
tint = kwargs.get("tint", list(datetime642iso8601(vdf_time.data[[0, -1]])))
vdf_time = time_clip(vdf_time, tint)
vdf_energy = time_clip(vdf.energy, tint).copy()
delta_energy_minu = time_clip(delta_energy_minu, tint)
vdf_phi = time_clip(vdf.phi, tint).copy()
vdf_theta = vdf.theta.copy()
vdf_data = time_clip(vdf.data, tint).copy()
# make input distribution to SI units, s^3/m^6
if vdf.data.attrs["UNITS"].lower() == "s^3/cm^6":
vdf_data *= 1e12
elif vdf.data.attrs["UNITS"].lower() == "s^3/m^6":
vdf_data *= 1e0
elif vdf.data.attrs["UNITS"].lower() == "s^3/km^6":
vdf_data *= 1e-18
else:
raise ValueError("Invalid units!!")
# Get VDF dimension (time, energy, phi, theta)
# n_t, _, n_ph, _ = vdf_data.shape
n_t, n_en, _, _ = vdf_data.shape
# Construct or resample the time series of transformation matrix
if isinstance(xyz, xr.DataArray):
# If time series, clip to time interval and resample to VDF's time line
xyz = time_clip(xyz, tint)
xyz = resample(xyz, vdf_time).data
elif isinstance(xyz, np.ndarray) and xyz.ndim == 2:
assert xyz.shape == (3, 3), "xyz must be a transformation matrix!!"
# If matrix, tile to VDF timeline
xyz = np.tile(xyz, (n_t, 1, 1))
else:
raise TypeError("Invalid type for xyz")
# Check that the transformation matrices are orthonormal direct
x_phat_ts, y_phat_ts, z_phat_ts = [xyz[:, :, i] for i in range(3)]
x_phat_ts /= np.linalg.norm(x_phat_ts, axis=1, keepdims=True)
y_phat_ts /= np.linalg.norm(y_phat_ts, axis=1, keepdims=True)
z_phat_ts = np.cross(x_phat_ts, y_phat_ts)
z_phat_ts /= np.linalg.norm(z_phat_ts, axis=1, keepdims=True)
y_phat_ts = np.cross(z_phat_ts, x_phat_ts)
# Construct the time series of transformation matrix
xyz_ts = np.transpose(
np.stack([x_phat_ts, y_phat_ts, z_phat_ts]),
[1, 2, 0],
)
# Set azimuthal angle projection grid
# d_phi = kwargs.get("d_phi", 2 * np.pi / n_ph)
# phi_grid = np.linspace(0, 2 * np.pi, n_ph) + d_phi / 2 # centers
# Number of Monte Carlo iterations and weights
n_mc = kwargs.get("n_mc", 100)
weight = kwargs.get("weight", None)
v_lim = kwargs.get("v_lim", [-np.inf, np.inf]) # Velocity grid limits
a_lim = kwargs.get("a_lim", [-180.0, 180.0]) # Azimuthal angle limits
# Spacecraft potential to account for photoelectrons. If not provided set
# to 0 V.
sc_pot = kwargs.get("sc_pot", ts_scalar(vdf_time.data, np.zeros(n_t)))
sc_pot = resample(sc_pot, vdf_time).data
# Threshold energy (e.g., to remove background noise). If not provided set
# to 0 eV.
lower_e_lim = kwargs.get("lower_e_lim", 0.0)
if isinstance(lower_e_lim, xr.DataArray):
# If time series, clip to time interval and resample to VDF's time line
lower_e_lim = resample(lower_e_lim, vdf_time).data
elif isinstance(lower_e_lim, float):
# If float, tile to VDF's timeline
lower_e_lim = np.tile(lower_e_lim, n_t)
else:
raise TypeError("Invalid lower_e_lim!!")
# Set particle specie mass from VDF's attribute "species"
if vdf.species.lower() == "electrons":
m_p = electron_mass
elif vdf.species.lower() == "ions":
m_p = proton_mass
else:
raise ValueError("Invalid species!!")
# Convert maximum energy from instrument to velocity (relativistically
# correct)
e_max = vdf_energy.data[0, -1] + vdf.attrs["delta_energy_plus"][0, -1]
gamma_max = 1 + electron_volt * e_max / (m_p * speed_of_light**2)
v_max = speed_of_light * np.sqrt(1 - 1 / gamma_max**2) # m/s
speed_grid = kwargs.get("vg", None) # TODO : check that for no input!!
speed_grid_edges = kwargs.get("vg_edges", None)
# initiate projected f
if speed_grid_edges is not None:
n_vg = len(speed_grid_edges) - 1
speed_grid_cart = None
speed_grid = speed_grid_edges[:-1] + 0.5 * np.diff(speed_grid_edges)
elif speed_grid is not None:
n_vg = len(speed_grid)
speed_grid_cart = None
else:
n_vg = 2 * n_en
speed_grid_cart = np.linspace(-v_max, v_max, n_vg, endpoint=True)
n_pr = int(dim[0])
f_g = np.zeros([n_t, *[n_vg] * n_pr])
all_v = {f"v{chr(120 + i)}": np.zeros((n_t, n_vg)) for i in range(n_pr)}
for i_t in tqdm.tqdm(range(n_t), ncols=60): # display progress
# 3d data matrix for time index
f_3d = np.squeeze(vdf_data.data[i_t, ...]) # s^3/m^6
f_3d = f_3d.astype(
np.float64,
).copy() # convert to C contiguous float64
# Energies
energy = vdf_energy.data[i_t, :]
energy = energy.astype(np.float64) # Convert to float64
# Assign zeros to phase-space density values corresponding to energies
# below the threshold energy or below the spacecraft potential if
# provided.
thresh_e = np.max([lower_e_lim[i_t], sc_pot[i_t]])
e_min_idx = np.where(energy - delta_energy_minu[i_t, :] > thresh_e)[0][0]
f_3d[:e_min_idx] = 0.0
# Correct energy shift due to spacecraft potential
energy -= sc_pot[i_t]
energy[energy < 0] = 0.0
# Convert energy to velocity (relativistically correct)
gamma = 1 + electron_volt * energy / (m_p * speed_of_light**2)
speed = speed_of_light * np.sqrt(1 - 1 / gamma**2) # m/s
# azimuthal angle
phi = vdf_phi.data[i_t, :].astype(np.float64) # in degrees
phi = np.deg2rad(phi - 180.0) # in radians
# elevation angle
theta = vdf_theta.data.astype(np.float64) # in degrees
theta = np.deg2rad(theta - 90.0) # in radians
# Set velocity projection grid.
if speed_grid is None:
if base == "pol":
if dim == "1d":
speed_grid = np.hstack((-np.flip(speed), speed))
else:
# speed_grid = speed
raise NotImplementedError(
"2d projection on polar grid is not ready yet!!",
)
else:
speed_grid = speed_grid_cart
else:
pass
options = {
"xyz": xyz_ts[i_t, ...],
"n_mc": n_mc,
"weight": weight,
"v_lim": v_lim,
"a_lim": a_lim,
"projection_dim": dim,
"projection_base": base,
"speed_grid_edges": speed_grid_edges,
}
tmpst = int_sph_dist(f_3d, speed, phi, theta, speed_grid, **options)
f_g[i_t, ...] = tmpst["f"]
for i in range(n_pr):
all_v[f"v{chr(120 + i)}"][i_t, :] = tmpst[f"v{chr(120 + i)}"] / 1e3 # km/s
# Build output as a time series with dimensions:
# - (time x vx) for 1D reduced distribution
# - (time x vx x vy) for 2D reduced distribution
coords = [
vdf_time.data,
*[all_v[f"v{chr(120 + i)}"][0, :] for i in range(n_pr)],
]
dims = ["time", *[f"v{chr(120 + i)}" for i in range(n_pr)]]
out = xr.DataArray(f_g, coords=coords, dims=dims)
return out