#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Built-in imports
import random
from math import asin, cos, sin, sqrt
# Third party imports
import numba
import numpy as np
__author__ = "Louis Richard"
__email__ = "louisr@irfu.se"
__copyright__ = "Copyright 2020-2023"
__license__ = "MIT"
__version__ = "2.4.2"
__status__ = "Prototype"
[docs]def int_sph_dist(vdf, speed, phi, theta, speed_grid, **kwargs):
r"""Integrate a spherical distribution function to a line/plane.
Parameters
----------
vdf : numpy.ndarray
Phase-space density skymap.
speed : numpy.ndarray
Velocity of the instrument bins,
phi : numpy.ndarray
Azimuthal angle of the instrument bins.
theta : numpy.ndarray
Elevation angle of the instrument bins.
speed_grid : numpy.ndarray
Velocity grid for interpolation.
**kwargs
Keyw
Returns
-------
"""
# Coordinates system transformation matrix
xyz = kwargs.get("xyz", np.eye(3))
# Number of Monte Carlo iterations and how number of MC points is
# weighted to data.
n_mc = kwargs.get("n_mc", 10)
weight = kwargs.get("weight", None)
# limit on out-of-plane velocity and azimuthal angle
v_lim = np.array(kwargs.get("v_lim", [-np.inf, np.inf]), dtype=np.float64)
a_lim = np.array(kwargs.get("a_lim", [-180.0, 180.0]), dtype=np.float64)
a_lim = np.deg2rad(a_lim)
# Projection dimension and base
projection_base = kwargs.get("projection_base", "pol")
projection_dim = kwargs.get("projection_dim", "1d")
speed_edges = kwargs.get("speed_edges", None)
speed_grid_edges = kwargs.get("speed_grid_edges", None)
# Azimuthal and elevation angles steps. Assumed to be constant
# if not provided.
d_phi = np.abs(np.median(np.diff(phi))) * np.ones_like(phi)
d_phi = kwargs.get("d_phi", d_phi)
d_theta = np.abs(np.median(np.diff(theta))) * np.ones_like(theta)
d_theta = kwargs.get("d_theta", d_theta)
# azimuthal angle of projection plane
n_az_g = len(phi)
d_phi_g = 2 * np.pi / n_az_g
phi_grid = np.linspace(0, 2 * np.pi - d_phi_g, n_az_g) + d_phi_g / 2
phi_grid = kwargs.get("phi_grid", phi_grid)
# Overwrite projection dimension if azimuthal angle of projection
# plane is not provided. Set the azimuthal angle grid width.
if phi_grid is not None and projection_dim.lower() in ["2d", "3d"]:
d_phi_grid = np.median(np.diff(phi_grid))
else:
projection_dim = "1d"
d_phi_grid = 1.0
# Make sure the transformation matrix is orthonormal.
x_phat = xyz[:, 0] / np.linalg.norm(xyz[:, 0]) # re-normalize
y_phat = xyz[:, 1] / np.linalg.norm(xyz[:, 1]) # re-normalize
z_phat = np.cross(x_phat, y_phat)
z_phat /= np.linalg.norm(z_phat)
y_phat = np.cross(z_phat, x_phat)
r_mat = np.transpose(np.stack([x_phat, y_phat, z_phat]), [1, 0])
if speed_edges is None:
d_v = np.hstack([np.diff(speed[:2]), np.diff(speed)])
d_v_m, d_v_p = [np.diff(speed) / 2.0] * 2
else:
d_v_m = speed - speed_edges[:-1]
d_v_p = speed_edges[1:] - speed
d_v = d_v_m + d_v_p
# Speed grid bins edges
if speed_grid_edges is None:
speed_grid_edges = np.zeros(len(speed_grid) + 1)
speed_grid_edges[0] = speed_grid[0] - np.diff(speed_grid[:2]) / 2.0
speed_grid_edges[1:-1] = speed_grid[:-1] + np.diff(speed_grid) / 2.0
speed_grid_edges[-1] = speed_grid[-1] + np.diff(speed_grid[-2:]) / 2.0
else:
speed_grid = speed_grid_edges[:-1] + np.diff(speed_grid_edges) / 2.0
if projection_base == "pol":
d_v_grid = np.diff(speed_grid_edges)
else:
mean_diff = np.mean(np.diff(speed_grid))
msg = "For a cartesian grid, all velocity bins must be equal!!"
assert (np.diff(speed_grid) / mean_diff - 1 < 1e-2).all(), msg
d_v_grid = mean_diff
# Weighting of number of Monte Carlo particles
n_sum = n_mc * np.sum(vdf != 0) # total number of Monte Carlo particles
if weight == "lin":
n_mc_mat = np.ceil(n_sum / np.sum(vdf) * vdf)
elif weight == "log":
n_mc_mat = np.ceil(
n_sum / np.sum(np.log10(vdf + 1)) * np.log10(vdf + 1),
)
else:
n_mc_mat = np.zeros_like(vdf)
n_mc_mat[vdf != 0] = n_mc
n_mc_mat = n_mc_mat.astype(int)
if projection_base == "cart" and projection_dim == "2d":
d_a_grid = d_v_grid**2
f_g = _mc_cart_2d(
vdf,
speed,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
speed_grid_edges,
d_a_grid,
v_lim,
a_lim,
n_mc_mat,
r_mat,
)
elif projection_base == "cart" and projection_dim == "3d":
d_a_grid = d_v_grid**3
f_g = _mc_cart_3d(
vdf,
speed,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
speed_grid_edges,
d_a_grid,
v_lim,
a_lim,
n_mc_mat,
r_mat,
)
else:
# Area or line element (primed)
d_a_grid = speed_grid ** (int(projection_dim[0]) - 1) * d_phi_grid * d_v_grid
d_a_grid = d_a_grid.astype(np.float64)
if projection_dim == "1d":
f_g = _mc_pol_1d(
vdf,
speed,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
speed_grid_edges,
d_a_grid,
v_lim,
a_lim,
n_mc_mat,
r_mat,
)
else:
raise NotImplementedError(
"2d projection on polar grid is not ready yet!!",
)
if projection_dim == "2d" and projection_base == "cart":
pst = {
"f": f_g,
"vx": speed_grid,
"vy": speed_grid,
"vx_edges": speed_grid_edges,
"vy_edges": speed_grid_edges,
}
elif projection_dim == "3d" and projection_base == "cart":
pst = {
"f": f_g,
"vx": speed_grid,
"vy": speed_grid,
"vz": speed_grid,
"vx_edges": speed_grid_edges,
"vy_edges": speed_grid_edges,
"vz_edges": speed_grid_edges,
}
else:
pst = {"f": f_g, "vx": speed_grid, "vx_edges": speed_grid_edges}
return pst
@numba.jit(cache=True, nogil=True, parallel=True, nopython=True)
def _mc_pol_1d(
vdf,
v,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
vg_edges,
d_a_grid,
v_lim,
a_lim,
n_mc,
r_mat,
):
r"""Perform 3D Monte-Carlo interpolation of the VDFs
Parameters
----------
vdf : double
3D skymap particle velocity distribution function.
v : double
1D array of instrument speed bins centers.
phi : double
1D array of instrument azimuthal angles bins centers.
theta : double
1D array of instrument elevation angles bins centers.
d_v : double
1D array of instrument speed bins widths.
d_v_m : double
1D array of minus velocity from bins centers.
d_phi : double
1D array of instrument azimuthal angles bins widths.
d_theta : double
1D array of instrument elevation angles bins widths.
vg_egdes : double
Bin centers of the velocity of the projection grid.
d_a_grid : double
Bin centers of the azimuthal angle of the projection in radians in
the span [0,2*pi]. If this input is given, the projection will be 2D.
If it is omitted, the projection will be 1D.
v_lim : double
Limits on the out-of-plane velocity interval in 2D and "transverse"
velocity in 1D.
a_lim : double
Angular limit in degrees, can be combined with v_lim.
n_mc : double
Number of Monte-Carlo particle for the corresponding instrument bins.
r_mat : double
Frame transformation matrix.
Returns
-------
f_g : double
Reduced/interpolated distribution.
"""
n_v, n_ph, n_th = vdf.shape
n_vg = len(vg_edges) - 1
f_g = np.zeros(n_vg)
for i in numba.prange(n_v):
for j in range(n_ph):
for k in range(n_th):
n_mc_ijk = n_mc[i, j, k]
if vdf[i][j][k] == 0.0:
continue
dtau_ijk = v[i] ** 2 * np.cos(theta[k]) * d_v[i] * d_phi[j] * d_theta[k]
c_ijk = dtau_ijk / n_mc_ijk
f_ijk = vdf[i, j, k]
for _ in range(n_mc_ijk):
d_v_mc = -random.random() * d_v[i] - d_v_m[0]
d_phi_mc = (random.random() - 0.5) * d_phi[j]
d_the_mc = (random.random() - 0.5) * d_theta[k]
# convert instrument bin to cartesian velocity
v_mc = v[i] + d_v_mc
phi_mc = phi[j] + d_phi_mc
theta_mc = theta[k] + d_the_mc
v_x = v_mc * cos(theta_mc) * cos(phi_mc)
v_y = v_mc * cos(theta_mc) * sin(phi_mc)
v_z = v_mc * sin(theta_mc)
# Get velocities in primed coordinate system
# vxp = [vx, vy, vz] * xphat'; % all MC points
v_x_p = r_mat[0, 0] * v_x + r_mat[1, 0] * v_y + r_mat[2, 0] * v_z
v_y_p = r_mat[0, 1] * v_x + r_mat[1, 1] * v_y + r_mat[2, 1] * v_z
v_z_p = r_mat[0, 2] * v_x + r_mat[1, 2] * v_y + r_mat[2, 2] * v_z
v_z_p = sqrt(pow(v_y_p, 2) + pow(v_z_p, 2))
alpha = asin(v_z_p / v_mc)
use_point = (v_z_p >= v_lim[0]) * (v_z_p < v_lim[1])
use_point = use_point * (alpha >= a_lim[0]) * (alpha < a_lim[1])
i_vxg = np.searchsorted(vg_edges[:-2], v_x_p)
d_a = d_a_grid[i_vxg]
if use_point * (i_vxg < n_vg):
f_g[i_vxg] += f_ijk * c_ijk / d_a
return f_g
@numba.jit(cache=True, nogil=True, parallel=True, nopython=True)
def _mc_cart_3d(
vdf,
v,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
vg_edges,
d_a_grid,
v_lim,
a_lim,
n_mc,
r_mat,
):
r"""Perform 3D Monte-Carlo interpolation of the VDFs
Parameters
----------
vdf : numpy.ndarray
3D skymap particle velocity distribution function.
v : numpy.ndarray
1D array of instrument speed bins centers.
phi : numpy.ndarray
1D array of instrument azimuthal angles bins centers.
theta : numpy.ndarray
1D array of instrument elevation angles bins centers.
d_v : numpy.ndarray
1D array of instrument speed bins widths.
d_v_m : numpy.ndarray
1D array of minus velocity from bins centers.
d_phi : numpy.ndarray
1D array of instrument azimuthal angles bins widths.
d_theta : numpy.ndarray
1D array of instrument elevation angles bins widths.
vg_egdes : double
Bin centers of the velocity of the projection grid.
d_a_grid : double
Bin centers of the azimuthal angle of the projection in radians in
the span [0,2*pi]. If this input is given, the projection will be 2D.
If it is omitted, the projection will be 1D.
v_lim : double
Limits on the out-of-plane velocity interval in 2D and "transverse"
velocity in 1D.
a_lim : double
Angular limit in degrees, can be combined with v_lim.
n_mc : double
Number of Monte-Carlo particle for the corresponding instrument bins.
r_mat : double
Frame transformation matrix.
Returns
-------
f_g : double
Reduced/interpolated distribution.
"""
# Get dimension of the instrument and interpolation grid.
n_v, n_ph, n_th = vdf.shape
n_vg = len(vg_edges) - 1
f_g = np.zeros((n_vg, n_vg, n_vg))
for i in numba.prange(n_v):
for j in range(n_ph):
for k in range(n_th):
n_mc_ijk = n_mc[i, j, k]
if vdf[i][j][k] == 0.0:
continue
dtau_ijk = v[i] ** 2 * cos(theta[k]) * d_v[i] * d_phi[j] * d_theta[k]
c_ijk = dtau_ijk / n_mc_ijk
f_ijk = vdf[i, j, k]
for _ in range(n_mc_ijk):
d_v_mc = -random.random() * d_v[i] - d_v_m[0]
d_phi_mc = (random.random() - 0.5) * d_phi[j]
d_the_mc = (random.random() - 0.5) * d_theta[k]
# convert instrument bin to cartesian velocity
v_mc = v[i] + d_v_mc
phi_mc = phi[j] + d_phi_mc
theta_mc = theta[k] + d_the_mc
v_x = v_mc * cos(theta_mc) * cos(phi_mc)
v_y = v_mc * cos(theta_mc) * sin(phi_mc)
v_z = v_mc * sin(theta_mc)
# Get velocities in primed coordinate system
# vxp = [vx, vy, vz] * xphat'; % all MC points
v_x_p = r_mat[0, 0] * v_x + r_mat[1, 0] * v_y + r_mat[2, 0] * v_z
v_y_p = r_mat[0, 1] * v_x + r_mat[1, 1] * v_y + r_mat[2, 1] * v_z
v_z_p = r_mat[0, 2] * v_x + r_mat[1, 2] * v_y + r_mat[2, 2] * v_z
# velocity within [-dVm, +dVp]
alpha = asin(v_z_p / v_mc)
use_point = v_z_p >= v_lim[0] * v_z_p < v_lim[1]
use_point = use_point * alpha >= a_lim[0] * alpha < a_lim[1]
i_vxg = np.searchsorted(vg_edges[:-2], v_x_p)
i_vyg = np.searchsorted(vg_edges[:-2], v_y_p)
i_vzg = np.searchsorted(vg_edges[:-2], v_z_p)
if use_point:
f_g[i_vxg, i_vyg, i_vzg] += f_ijk * c_ijk / d_a_grid
return f_g
@numba.jit(cache=True, nogil=True, parallel=True, nopython=True)
def _mc_cart_2d(
vdf,
v,
phi,
theta,
d_v,
d_v_m,
d_phi,
d_theta,
vg_edges,
d_a_grid,
v_lim,
a_lim,
n_mc,
r_mat,
):
r"""Perform 3D Monte-Carlo interpolation of the VDFs
Parameters
----------
vdf : double
3D skymap particle velocity distribution function.
v : double
1D array of instrument speed bins centers.
phi : double
1D array of instrument azimuthal angles bins centers.
theta : double
1D array of instrument elevation angles bins centers.
d_v : double
1D array of instrument speed bins widths.
d_v_m : double
1D array of minus velocity from bins centers.
d_phi : double
1D array of instrument azimuthal angles bins widths.
d_theta : double
1D array of instrument elevation angles bins widths.
vg_egdes : double
Bin centers of the velocity of the projection grid.
d_a_grid : double
Bin centers of the azimuthal angle of the projection in radians in
the span [0,2*pi]. If this input is given, the projection will be 2D.
If it is omitted, the projection will be 1D.
v_lim : double
Limits on the out-of-plane velocity interval in 2D and "transverse"
velocity
in 1D.
a_lim : double
Angular limit in degrees, can be combined with v_lim.
n_mc : double
Number of Monte-Carlo particle for the corresponding instrument bins.
r_mat : double
Frame transformation matrix.
Returns
-------
f_g : double
Reduced/interpolated distribution.
"""
# Get dimension of the instrument and interpolation grid.
n_v, n_ph, n_th = vdf.shape
n_vg = len(vg_edges) - 1
f_g = np.zeros((n_vg, n_vg))
for i in range(n_v):
for j in range(n_ph):
for k in range(n_th):
n_mc_ijk = n_mc[i, j, k]
if vdf[i][j][k] == 0.0:
continue
dtau_ijk = v[i] ** 2 * cos(theta[k]) * d_v[i] * d_phi[j] * d_theta[k]
c_ijk = dtau_ijk / n_mc_ijk
f_ijk = vdf[i, j, k]
for _ in range(n_mc_ijk):
d_v_mc = -random.random() * d_v[i] - d_v_m[0]
d_phi_mc = (random.random() - 0.5) * d_phi[j]
d_the_mc = (random.random() - 0.5) * d_theta[k]
# convert instrument bin to cartesian velocity
v_mc = v[i] + d_v_mc
phi_mc = phi[j] + d_phi_mc
theta_mc = theta[k] + d_the_mc
v_x = v_mc * cos(theta_mc) * cos(phi_mc)
v_y = v_mc * cos(theta_mc) * sin(phi_mc)
v_z = v_mc * sin(theta_mc)
# Get velocities in primed coordinate system
# vxp = [vx, vy, vz] * xphat'; % all MC points
v_x_p = r_mat[0, 0] * v_x + r_mat[1, 0] * v_y + r_mat[2, 0] * v_z
v_y_p = r_mat[0, 1] * v_x + r_mat[1, 1] * v_y + r_mat[2, 1] * v_z
v_z_p = r_mat[0, 2] * v_x + r_mat[1, 2] * v_y + r_mat[2, 2] * v_z
# velocity within [-dVm, +dVp]
alpha = asin(v_z_p / v_mc)
use_point = v_z_p >= v_lim[0] * v_z_p < v_lim[1]
use_point = use_point * alpha >= a_lim[0] * alpha < a_lim[1]
i_vxg = np.searchsorted(vg_edges, v_x_p)
i_vyg = np.searchsorted(vg_edges, v_y_p)
if use_point:
f_g[i_vxg, i_vyg] += f_ijk * c_ijk / d_a_grid
return f_g