#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Built-in imports
import random
from math import asin, cos, sin, sqrt
# Third party imports
import numba
import numpy as np
__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"
[docs]def int_sph_dist(vdf, velocity, phi, theta, velocity_grid, phi_grid, **kwargs):
r"""Integrate a spherical distribution function to a line/plane.
Parameters
----------
vdf : numpy.ndarray
Phase-space density skymap.
velocity : numpy.ndarray
Velocity of the instrument bins,
phi : numpy.ndarray
Azimuthal angle of the instrument bins.
theta : numpy.ndarray
Elevation angle of the instrument bins.
velocity_grid : numpy.ndarray
Velocity grid for interpolation.
phi_grid : numpy.ndarray
Azimuthal angle grid for interpolation.
**kwargs
Keyword arguments.
Returns
-------
pst : dict
Dictionary with the projected distribution and
corresponding velocity grid information.
"""
# Coordinates system transformation matrix
xyz = kwargs.get("xyz", np.eye(3))
# Make sure the transformation matrix is orthonormal.
x_phat = xyz[:, 0] / np.linalg.norm(xyz[:, 0]) # re-normalize
y_phat = xyz[:, 1] / np.linalg.norm(xyz[:, 1]) # re-normalize
z_phat = np.cross(x_phat, y_phat)
z_phat /= np.linalg.norm(z_phat)
y_phat = np.cross(z_phat, x_phat)
# Define the rotation matrix from original to primed frame
r_mat = np.transpose(np.stack([x_phat, y_phat, z_phat]), [1, 0])
# Number of Monte Carlo iterations and how number of MC points is
# weighted to data.
n_mc = kwargs.get("n_mc", 10)
weight = kwargs.get("weight", None)
# limit on out-of-plane velocity
v_lim = np.array(kwargs.get("v_int", [-np.inf, np.inf]), dtype=np.float64)
# limit on azymuthal angle from projection plane
a_lim = np.array(kwargs.get("a_int", [-180.0, 180.0]), dtype=np.float64)
a_lim = np.deg2rad(a_lim)
# Projection dimension and base
projection_base = kwargs.get("projection_base", "pol")
projection_dim = kwargs.get("projection_dim", "1d")
velocity_edges = kwargs.get("velocity_edges", None)
velocity_grid_edges = kwargs.get("velocity_grid_edges", None)
# Azimuthal and elevation angles steps. Assumed to be constant
# if not provided.
d_phi = np.abs(np.median(np.diff(phi))) * np.ones_like(phi)
d_phi = kwargs.get("d_phi", d_phi)
d_theta = np.abs(np.median(np.diff(theta))) * np.ones_like(theta)
d_theta = kwargs.get("d_theta", d_theta)
if velocity_edges is None:
d_v = np.hstack([np.diff(velocity[:2]), np.diff(velocity)])
d_v_m, d_v_p = [np.diff(velocity) / 2.0] * 2
else:
d_v_m = velocity - velocity_edges[:-1]
d_v_p = velocity_edges[1:] - velocity
d_v = d_v_m + d_v_p
# Overwrite projection dimension if azimuthal angle of projection
# plane is not provided. Set the azimuthal angle grid width.
if phi_grid is not None and projection_dim.lower() in ["2d", "3d"]:
d_phi_grid = np.median(np.diff(phi_grid))
else:
projection_dim = "1d"
d_phi_grid = 1.0
# Speed grid bins edges
if velocity_grid_edges is None:
velocity_grid_diff = np.diff(velocity_grid)
velocity_grid_edges = np.zeros(len(velocity_grid) + 1)
velocity_grid_edges[0] = velocity_grid[0] - velocity_grid_diff[0] / 2.0
velocity_grid_edges[1:-1] = velocity_grid[:-1] + velocity_grid_diff / 2.0
velocity_grid_edges[-1] = velocity_grid[-1] + velocity_grid_diff[-1] / 2.0
else:
velocity_grid = velocity_grid_edges[:-1] + np.diff(velocity_grid_edges) / 2.0
if projection_base == "pol":
d_v_grid = np.diff(velocity_grid_edges)
if projection_dim == "2d":
raise NotImplementedError(
"2d projection on polar grid is not ready yet!!",
)
else:
mean_diff = np.mean(np.diff(velocity_grid))
msg = "For a cartesian grid, all velocity bins must be equal!!"
assert (np.diff(velocity_grid) / mean_diff - 1 < 1e-2).all(), msg
d_v_grid = mean_diff
# Weighting of number of Monte Carlo particles
n_sum = n_mc * np.sum(vdf != 0) # total number of Monte Carlo particles
if weight == "lin":
n_mc_mat = np.ceil(n_sum / np.sum(vdf) * vdf)
elif weight == "log":
n_mc_mat = np.ceil(
n_sum / np.sum(np.log10(vdf + 1.0)) * np.log10(vdf + 1.0),
)
else:
n_mc_mat = np.zeros_like(vdf)
n_mc_mat[vdf != 0] = n_mc
n_mc_mat = n_mc_mat.astype(int)
if projection_base == "pol":
d_a_grid = velocity_grid ** (int(projection_dim[0]) - 1) * d_phi_grid * d_v_grid
d_a_grid = d_a_grid.astype(np.float64)
else:
d_a_grid = d_v_grid ** int(projection_dim[0])
d_a_grid = d_a_grid.astype(np.float64)
if projection_base == "cart" and projection_dim == "2d":
f_g = _mc_cart_2d(
vdf,
velocity,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
velocity_grid_edges,
d_a_grid,
v_lim,
a_lim,
n_mc_mat,
r_mat,
)
elif projection_base == "cart" and projection_dim == "3d":
f_g = _mc_cart_3d(
vdf,
velocity,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
velocity_grid_edges,
d_a_grid,
v_lim,
a_lim,
n_mc_mat,
r_mat,
)
elif projection_base == "pol" and projection_dim == "1d":
f_g = _mc_pol_1d(
vdf,
velocity,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
velocity_grid_edges,
d_a_grid,
v_lim,
a_lim,
n_mc_mat,
r_mat,
)
else:
raise NotImplementedError(
f"{projection_dim} projection on {projection_base} grid is not ready yet!!",
)
if projection_dim == "2d" and projection_base == "cart":
pst = {
"f": f_g,
"vx": velocity_grid,
"vy": velocity_grid,
"vx_edges": velocity_grid_edges,
"vy_edges": velocity_grid_edges,
}
elif projection_dim == "3d" and projection_base == "cart":
pst = {
"f": f_g,
"vx": velocity_grid,
"vy": velocity_grid,
"vz": velocity_grid,
"vx_edges": velocity_grid_edges,
"vy_edges": velocity_grid_edges,
"vz_edges": velocity_grid_edges,
}
else:
pst = {"f": f_g, "vx": velocity_grid, "vx_edges": velocity_grid_edges}
return pst
@numba.jit(cache=True, nogil=True, parallel=True, nopython=True)
def _mc_pol_1d(
vdf,
v,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
vg_edges,
d_a_grid,
v_lim,
a_lim,
n_mc,
r_mat,
):
r"""Perform 3D Monte-Carlo interpolation of the VDFs
Parameters
----------
vdf : double
3D skymap particle velocity distribution function.
v : double
1D array of instrument velocity bins centers.
phi : double
1D array of instrument azimuthal angles bins centers.
theta : double
1D array of instrument elevation angles bins centers.
d_v : double
1D array of instrument velocity bins widths.
d_v_m : double
1D array of minus velocity from bins centers.
d_phi : double
1D array of instrument azimuthal angles bins widths.
d_theta : double
1D array of instrument elevation angles bins widths.
vg_egdes : double
Bin centers of the velocity of the projection grid.
d_a_grid : double
Bin centers of the azimuthal angle of the projection in radians in
the span [0,2*pi]. If this input is given, the projection will be 2D.
If it is omitted, the projection will be 1D.
v_lim : double
Limits on the out-of-plane velocity interval in 2D and "transverse"
velocity in 1D.
a_lim : double
Angular limit in degrees, can be combined with v_lim.
n_mc : double
Number of Monte-Carlo particle for the corresponding instrument bins.
r_mat : double
Frame transformation matrix.
Returns
-------
f_g : double
Reduced/interpolated distribution.
"""
n_v, n_ph, n_th = vdf.shape
n_vg = len(vg_edges) - 1
f_g = np.zeros(n_vg)
for i in numba.prange(n_v):
for j in range(n_ph):
for k in range(n_th):
n_mc_ijk = n_mc[i, j, k]
if vdf[i][j][k] == 0.0:
continue
dtau_ijk = v[i] ** 2 * np.cos(theta[k]) * d_v[i] * d_phi[j] * d_theta[k]
c_ijk = dtau_ijk / n_mc_ijk
f_ijk = vdf[i, j, k]
for l_mc in range(n_mc_ijk):
# Generate Monte-Carlo particle speed, phi, and theta
if l_mc == 0:
# First Monte-Carlo particle is set at the bin center
d_v_mc = 0.0
d_phi_mc = 0.0
else:
d_v_mc = -random.random() * d_v[i] - d_v_m[0]
d_phi_mc = (random.random() - 0.5) * d_phi[j]
# convert instrument bin to cartesian velocity
v_mc = v[i] + d_v_mc
phi_mc = phi[j] + d_phi_mc
# This is needed to distribute the points evenly in space
# (more points further 'equatorward' since the slice is
# wider there).
if l_mc == 0:
theta_mc = theta[k]
else:
theta_1 = theta[k] - 0.5 * d_theta[k]
theta_2 = theta[k] + 0.5 * d_theta[k]
sin_theta_1 = sin(theta_1)
sin_theta_2 = sin(theta_2)
d_sin_theta = sin_theta_2 - sin_theta_1
sin_theta_mc = sin_theta_1 + random.random() * d_sin_theta
theta_mc = asin(sin_theta_mc)
v_x = v_mc * cos(theta_mc) * cos(phi_mc)
v_y = v_mc * cos(theta_mc) * sin(phi_mc)
v_z = v_mc * sin(theta_mc)
# Get velocities in primed coordinate system
# vxp = [vx, vy, vz] * xphat'; % all MC points
v_x_p = r_mat[0, 0] * v_x + r_mat[1, 0] * v_y + r_mat[2, 0] * v_z
v_y_p = r_mat[0, 1] * v_x + r_mat[1, 1] * v_y + r_mat[2, 1] * v_z
v_z_p = r_mat[0, 2] * v_x + r_mat[1, 2] * v_y + r_mat[2, 2] * v_z
# get transverse velocity sqrt(vy^2+vz^2)
v_z_p = sqrt(v_y_p**2 + v_z_p**2)
alpha = asin(v_z_p / v_mc)
# Check if the velocity is within the v_lim and a_lim bounds
use_point = v_lim[0] <= v_z_p < v_lim[1]
use_point = use_point and (a_lim[0] <= alpha < a_lim[1])
# Find which bin the MC point falls into
v_p = v_x_p
if v_p > vg_edges[-1] or v_p < vg_edges[0]:
continue
# searchsorted returns the index where the element should be
# inserted to maintain order, so we subtract 1 to get the index of
# the bin that contains the element
i_vxg = np.searchsorted(vg_edges, v_p) - 1
# Special case for the first bin edge, which is the lower limit of
# the first bin and should be included in the first bin
if i_vxg == -1:
i_vxg = 0
d_a = d_a_grid[i_vxg]
if use_point * (i_vxg < n_vg):
f_g[i_vxg] += f_ijk * c_ijk / d_a
return f_g
@numba.jit(cache=True, nogil=True, parallel=True, nopython=True)
def _mc_cart_2d(
vdf,
v,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
vg_edges,
d_a_grid,
v_lim,
a_lim,
n_mc,
r_mat,
):
r"""Perform 3D Monte-Carlo interpolation of the VDFs
Parameters
----------
vdf : double
3D skymap particle velocity distribution function.
v : double
1D array of instrument velocity bins centers.
phi : double
1D array of instrument azimuthal angles bins centers.
theta : double
1D array of instrument elevation angles bins centers.
d_v : double
1D array of instrument velocity bins widths.
d_v_m : double
1D array of minus velocity from bins centers.
d_phi : double
1D array of instrument azimuthal angles bins widths.
d_theta : double
1D array of instrument elevation angles bins widths.
vg_egdes : double
Bin centers of the velocity of the projection grid.
d_a_grid : double
Bin centers of the azimuthal angle of the projection in radians in
the span [0,2*pi]. If this input is given, the projection will be 2D.
If it is omitted, the projection will be 1D.
v_lim : double
Limits on the out-of-plane velocity interval in 2D and "transverse"
velocity
in 1D.
a_lim : double
Angular limit in degrees, can be combined with v_lim.
n_mc : double
Number of Monte-Carlo particle for the corresponding instrument bins.
r_mat : double
Frame transformation matrix.
Returns
-------
f_g : double
Reduced/interpolated distribution.
"""
# Get dimension of the instrument and interpolation grid.
n_v, n_ph, n_th = vdf.shape
n_vg = len(vg_edges) - 1
f_g = np.zeros((n_vg, n_vg))
for i in range(n_v):
for j in range(n_ph):
for k in range(n_th):
n_mc_ijk = n_mc[i, j, k]
if vdf[i][j][k] == 0.0:
continue
dtau_ijk = v[i] ** 2 * cos(theta[k]) * d_v[i] * d_phi[j] * d_theta[k]
c_ijk = dtau_ijk / n_mc_ijk
f_ijk = vdf[i, j, k]
for l_mc in range(n_mc_ijk):
# Generate Monte-Carlo particle speed, phi, and theta
if l_mc == 0:
# First Monte-Carlo particle is set at the bin center
d_v_mc = 0.0
d_phi_mc = 0.0
else:
d_v_mc = -random.random() * d_v[i] - d_v_m[0]
d_phi_mc = (random.random() - 0.5) * d_phi[j]
# convert instrument bin to cartesian velocity
v_mc = v[i] + d_v_mc
phi_mc = phi[j] + d_phi_mc
# This is needed to distribute the points evenly in space
# (more points further 'equatorward' since the slice is
# wider there).
if l_mc == 0:
theta_mc = theta[k]
else:
theta_1 = theta[k] - 0.5 * d_theta[k]
theta_2 = theta[k] + 0.5 * d_theta[k]
sin_theta_1 = sin(theta_1)
sin_theta_2 = sin(theta_2)
d_sin_theta = sin_theta_2 - sin_theta_1
sin_theta_mc = sin_theta_1 + random.random() * d_sin_theta
theta_mc = asin(sin_theta_mc)
# Calculate velocity of the Monte-Carlo particle in
# cartesian coordinates
v_x = v_mc * cos(theta_mc) * cos(phi_mc)
v_y = v_mc * cos(theta_mc) * sin(phi_mc)
v_z = v_mc * sin(theta_mc)
# Get velocities in primed coordinate system
v_x_p = r_mat[0, 0] * v_x + r_mat[1, 0] * v_y + r_mat[2, 0] * v_z
v_y_p = r_mat[0, 1] * v_x + r_mat[1, 1] * v_y + r_mat[2, 1] * v_z
v_z_p = r_mat[0, 2] * v_x + r_mat[1, 2] * v_y + r_mat[2, 2] * v_z
# Elevation angle from the projection plane
alpha = asin(v_z_p / v_mc)
# Check if the velocity is within the v_lim and a_lim bounds
use_point = v_lim[0] <= v_z_p < v_lim[1]
use_point = use_point and (a_lim[0] <= alpha < a_lim[1])
# Check if the velocity along the x direction is outside of the grid
if v_x_p > vg_edges[-1] or v_x_p < vg_edges[0]:
continue
# Check if the velocity along the y direction is outside of the grid
if v_y_p > vg_edges[-1] or v_y_p < vg_edges[0]:
continue
# searchsorted returns the index where the element should be
# inserted to maintain order, so we subtract 1 to get the index of
# the bin that contains the element
i_vxg = np.searchsorted(vg_edges, v_x_p) - 1
i_vyg = np.searchsorted(vg_edges, v_y_p) - 1
# Special case for the first bin edge, which is the lower limit of
# the first bin and should be included in the first bin
if i_vxg == -1:
i_vxg = 0
if i_vyg == -1:
i_vyg = 0
if use_point:
f_g[i_vxg, i_vyg] += f_ijk * c_ijk / d_a_grid
return f_g
@numba.jit(cache=True, nogil=True, parallel=True, nopython=True)
def _mc_cart_3d(
vdf,
v,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
vg_edges,
d_a_grid,
v_lim,
a_lim,
n_mc,
r_mat,
):
r"""Perform 3D Monte-Carlo interpolation of the VDFs
Parameters
----------
vdf : numpy.ndarray
3D skymap particle velocity distribution function.
v : numpy.ndarray
1D array of instrument velocity bins centers.
phi : numpy.ndarray
1D array of instrument azimuthal angles bins centers.
theta : numpy.ndarray
1D array of instrument elevation angles bins centers.
d_v : numpy.ndarray
1D array of instrument velocity bins widths.
d_v_m : numpy.ndarray
1D array of minus velocity from bins centers.
d_phi : numpy.ndarray
1D array of instrument azimuthal angles bins widths.
d_theta : numpy.ndarray
1D array of instrument elevation angles bins widths.
vg_egdes : double
Bin centers of the velocity of the projection grid.
d_a_grid : double
Bin centers of the azimuthal angle of the projection in radians in
the span [0,2*pi]. If this input is given, the projection will be 2D.
If it is omitted, the projection will be 1D.
v_lim : double
Limits on the out-of-plane velocity interval in 2D and "transverse"
velocity in 1D.
a_lim : double
Angular limit in degrees, can be combined with v_lim.
n_mc : double
Number of Monte-Carlo particle for the corresponding instrument bins.
r_mat : double
Frame transformation matrix.
Returns
-------
f_g : double
Reduced/interpolated distribution.
"""
# Get dimension of the instrument and interpolation grid.
n_v, n_ph, n_th = vdf.shape
n_vg = len(vg_edges) - 1
f_g = np.zeros((n_vg, n_vg, n_vg))
for i in numba.prange(n_v):
for j in range(n_ph):
for k in range(n_th):
n_mc_ijk = n_mc[i, j, k]
if vdf[i][j][k] == 0.0:
continue
dtau_ijk = v[i] ** 2 * cos(theta[k]) * d_v[i] * d_phi[j] * d_theta[k]
c_ijk = dtau_ijk / n_mc_ijk
f_ijk = vdf[i, j, k]
for l_mc in range(n_mc_ijk):
# Generate Monte-Carlo particle speed, phi, and theta
if l_mc == 0:
# First Monte-Carlo particle is set at the bin center
d_v_mc = 0.0
d_phi_mc = 0.0
else:
d_v_mc = -random.random() * d_v[i] - d_v_m[0]
d_phi_mc = (random.random() - 0.5) * d_phi[j]
# convert instrument bin to cartesian velocity
v_mc = v[i] + d_v_mc
phi_mc = phi[j] + d_phi_mc
# This is needed to distribute the points evenly in space
# (more points further 'equatorward' since the slice is
# wider there).
if l_mc == 0:
theta_mc = theta[k]
else:
theta_1 = theta[k] - 0.5 * d_theta[k]
theta_2 = theta[k] + 0.5 * d_theta[k]
sin_theta_1 = sin(theta_1)
sin_theta_2 = sin(theta_2)
d_sin_theta = sin_theta_2 - sin_theta_1
sin_theta_mc = sin_theta_1 + random.random() * d_sin_theta
theta_mc = asin(sin_theta_mc)
v_x = v_mc * cos(theta_mc) * cos(phi_mc)
v_y = v_mc * cos(theta_mc) * sin(phi_mc)
v_z = v_mc * sin(theta_mc)
# Get velocities in primed coordinate system
# vxp = [vx, vy, vz] * xphat'; % all MC points
v_x_p = r_mat[0, 0] * v_x + r_mat[1, 0] * v_y + r_mat[2, 0] * v_z
v_y_p = r_mat[0, 1] * v_x + r_mat[1, 1] * v_y + r_mat[2, 1] * v_z
v_z_p = r_mat[0, 2] * v_x + r_mat[1, 2] * v_y + r_mat[2, 2] * v_z
alpha = asin(v_z_p / v_mc)
# Check if the velocity is within the v_lim and a_lim bounds
use_point = v_lim[0] <= v_z_p < v_lim[1]
use_point = use_point and (a_lim[0] <= alpha < a_lim[1])
if v_x_p > vg_edges[-1] or v_x_p < vg_edges[0]:
continue
if v_y_p > vg_edges[-1] or v_y_p < vg_edges[0]:
continue
if v_z_p > vg_edges[-1] or v_z_p < vg_edges[0]:
continue
# searchsorted returns the index where the element should be
# inserted to maintain order, so we subtract 1 to get the index of
# the bin that contains the element
i_vxg = np.searchsorted(vg_edges, v_x_p) - 1
i_vyg = np.searchsorted(vg_edges, v_y_p) - 1
i_vzg = np.searchsorted(vg_edges, v_z_p) - 1
# Special case for the last bin edge, which is the upper limit of
# the last bin and should be included in the last bin
if i_vxg == -1:
i_vxg = 0
if i_vyg == -1:
i_vyg = 0
if i_vzg == -1:
i_vzg = 0
if use_point:
f_g[i_vxg, i_vyg, i_vzg] += f_ijk * c_ijk / d_a_grid
return f_g