Skip to content
Merged
7 changes: 7 additions & 0 deletions imap_processing/ialirt/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,13 @@ class IalirtSwapiConstants:
e_charge = 1.602176634e-19 # electronic charge, [C]
speed_coeff = np.sqrt(2 * e_charge / prot_mass) / 1e3

# temporary correction factor based on WIND data available
# overlapping with the first ~month of SWAPI data.
# to be replaced once SWAPI's L3 processing pipeline is finalized
# this will increase the model count by a factor of e^1,
# changing the output density by a factor of e^-1.
temporary_density_factor = np.exp(1)


class StationProperties(NamedTuple):
"""Class that represents properties of ground stations."""
Expand Down
69 changes: 59 additions & 10 deletions imap_processing/ialirt/l0/process_swapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
logger = logging.getLogger(__name__)

NUM_IALIRT_ENERGY_STEPS = 63
FILLVAL_FLOAT32 = -1.0e31


def count_rate(
Expand Down Expand Up @@ -57,6 +58,9 @@ def count_rate(
speed = speed * 1000 # convert km/s to m/s
density = density * 1e6 # convert 1/cm**3 to 1/m**3

# see comment on Consts.temporary_density_factor
density = density * Consts.temporary_density_factor

return (
(density * Consts.eff_area * (beta / np.pi) ** (3 / 2))
* (np.exp(-beta * (center_speed**2 + speed**2 - 2 * center_speed * speed)))
Expand Down Expand Up @@ -104,15 +108,43 @@ def optimize_pseudo_parameters(
60000 * (initial_speed_guess / 400) ** 2,
]
)
sol = curve_fit(
f=count_rate,
xdata=energy_passbands.take(range(max_index - 3, max_index + 3), mode="clip"),
ydata=count_rates.take(range(max_index - 3, max_index + 3), mode="clip"),
sigma=count_rate_error.take(range(max_index - 3, max_index + 3), mode="clip"),
p0=initial_param_guess,
)

return sol[0]
sol = None

try:
five_point_range = range(max_index - 2, max_index + 2 + 1)
xdata = energy_passbands.take(five_point_range, mode="clip")
ydata = count_rates.take(five_point_range, mode="clip")
sigma = count_rate_error.take(five_point_range, mode="clip")
curve_fit_output = curve_fit(
f=count_rate,
xdata=xdata,
ydata=ydata,
sigma=sigma,
p0=initial_param_guess,
)

# If covariance matrix is not finite, scipy failed to converge to a
# solution and could just be reporting the initial guess
covariance_matrix_is_finite = np.all(np.isfinite(curve_fit_output[1]))

# fit has failed if R^2 < 0.7
yfit = count_rate(xdata, *curve_fit_output[0])
r2 = 1 - np.sum((ydata - yfit) ** 2) / np.sum((ydata - ydata.mean()) ** 2)
r2_is_acceptable = r2 >= 0.7

if covariance_matrix_is_finite and r2_is_acceptable:
sol = curve_fit_output[0]
except RuntimeError:
logger.error("curve_fit failed")
sol = None

# report speed only if fit fails
if sol is None:
sol = initial_param_guess.copy()
sol[1:] = FILLVAL_FLOAT32

return sol


def geometric_mean(
Expand Down Expand Up @@ -237,8 +269,10 @@ def process_swapi_ialirt(
grouped_subset = grouped_dataset.sel(epoch=grouped_dataset.group == group)

raw_coin_count = process_sweep_data(grouped_subset, "swapi_coin_cnt")
# I-ALiRT packets are 16 times less than the regular science packets.
raw_coin_count = raw_coin_count * 16
# I-ALiRT packets have counts compressed by a factor of 16.
# Add 8 to avoid having counts truncated to 0 and to avoid
# counts being systematically too low
raw_coin_count = raw_coin_count * 16 + 8
# Subset to only the relevant I-ALiRT energy steps
raw_coin_count = raw_coin_count[:, :NUM_IALIRT_ENERGY_STEPS]
raw_coin_rate = raw_coin_count / SWAPI_LIVETIME
Expand Down Expand Up @@ -290,6 +324,21 @@ def process_swapi_ialirt(
pseudo_proton_temperature_list[-5:],
)

# replace nans (resulting from geometric means that
# include fill values) with fill values
(
avg_pseudo_proton_speed,
avg_pseudo_proton_density,
avg_pseudo_proton_temperature,
) = np.nan_to_num(
(
avg_pseudo_proton_speed,
avg_pseudo_proton_density,
avg_pseudo_proton_temperature,
),
nan=FILLVAL_FLOAT32,
)

swapi_data.append(
_populate_instrument_header_items(met)
| {
Expand Down
130 changes: 123 additions & 7 deletions imap_processing/tests/ialirt/unit/test_process_swapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@

from imap_processing import imap_module_directory
from imap_processing.ialirt.l0.process_swapi import (
FILLVAL_FLOAT32,
Consts,
count_rate,
geometric_mean,
optimize_pseudo_parameters,
Expand Down Expand Up @@ -184,8 +186,8 @@ def test_count_rate():
"""Use random realistic values to test for expected output of count_rate()."""

actual_result = count_rate(1370, *[550, 5.27, 1e5])
expected_result = 3073.023325893161
assert actual_result == expected_result, (
expected_result = 3073.023325893161 * Consts.temporary_density_factor
assert np.isclose(actual_result, expected_result), (
f"The actual result of count_rate()"
f" {actual_result} does not "
f"match the expected result "
Expand All @@ -202,24 +204,24 @@ def test_optimize_parameters():
"file_name": "ialirt_test_data_u_sw_550_n_sw_5_T_sw_100000_v2.csv",
"expected_values": { # expected output and acceptable tolerance
"pseudo_speed": (550, 0.01),
"pseudo_density": (5, 0.14),
"pseudo_temperature": (1e5, 0.2),
"pseudo_density": (5 / Consts.temporary_density_factor, 0.14),
"pseudo_temperature": (1e5, 0.25),
},
},
"test_set_2": {
"file_name": "ialirt_test_data_u_sw_650_n_sw_3.0_T_sw_120000_v2.csv",
"expected_values": { # expected output and acceptable tolerance
"pseudo_speed": (650, 0.01),
"pseudo_density": (3, 0.3),
"pseudo_density": (3 / Consts.temporary_density_factor, 0.3),
"pseudo_temperature": (1.2e5, 0.28),
},
},
"test_set_3": {
"file_name": "ialirt_test_data_u_sw_400_n_sw_6.0_T_sw_80000_v2.csv",
"expected_values": { # expected output and acceptable tolerance
"pseudo_speed": (400, 0.01),
"pseudo_density": (6, 0.39),
"pseudo_temperature": (8e4, 0.15),
"pseudo_density": (6 / Consts.temporary_density_factor, 0.39),
"pseudo_temperature": (8e4, 0.2),
},
},
}
Expand Down Expand Up @@ -259,6 +261,120 @@ def test_optimize_parameters():
)


def test_optimize_parameters_exception_handling():
"""Test that the optimize_pseudo_parameters() function reports
speed only when given data that causes curve_fit to fail."""

expected_speed = 557.279273 # peak passband speed
file_name = "ialirt_test_data_u_sw_550_n_sw_5_T_sw_100000_v2.csv"

calibration_test_file = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/swapi_ialirt_energy_steps.csv"
)
energy_passbands = calibration_test_file["Energy"][0:63].to_numpy().astype(float)

energy_data = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/{file_name}"
)
count_rates = energy_data["Count Rates [Hz]"].to_numpy()
count_rates[0] = 0.0
count_rates = np.tile(count_rates, (2, 1))
count_rates_errors = energy_data["Count Rates Error [Hz]"].to_numpy()

"""
code to select the random seed:
for i in range(100):
np.random.seed(i)
result = optimize_pseudo_parameters(count_rates *
np.abs(np.random.standard_normal(size=count_rates.shape)),
count_rates_errors, energy_passbands)
if np.isclose(result['pseudo_speed'][0], expected_speed,
rtol=1e-6) and np.isnan(result['pseudo_density'][0]):
print(i)
"""
np.random.seed(14)
speed, density, temperature = optimize_pseudo_parameters(
count_rates * np.abs(np.random.standard_normal(size=count_rates.shape)),
count_rates_errors,
energy_passbands,
)

np.testing.assert_allclose(speed, expected_speed, rtol=1e-6)
np.testing.assert_allclose(density, FILLVAL_FLOAT32)
np.testing.assert_allclose(temperature, FILLVAL_FLOAT32)


def test_optimize_parameters_bad_fit_handling():
"""Test that the optimize_pseudo_parameters() function
reports speed only when the fit is too poor."""

file_name = "ialirt_test_data_u_sw_550_n_sw_5_T_sw_100000_v2.csv"

calibration_test_file = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/swapi_ialirt_energy_steps.csv"
)
energy_passbands = calibration_test_file["Energy"][0:63].to_numpy().astype(float)

energy_data = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/{file_name}"
)
count_rates = energy_data["Count Rates [Hz]"].to_numpy()
count_rates[0] = 0.0
count_rates_errors = energy_data["Count Rates Error [Hz]"].to_numpy()

# add high-amplitude randomness to the count rates to make the fit poor
np.random.seed(0)
count_rates = count_rates + np.abs(
np.random.standard_normal(size=count_rates.shape) * count_rates.max()
)

speed, density, temperature = optimize_pseudo_parameters(
count_rates, count_rates_errors, energy_passbands
)

expected_speed = (
np.sqrt(energy_passbands[count_rates.argmax(axis=-1)]) * Consts.speed_coeff
)

np.testing.assert_allclose(speed, expected_speed, rtol=1e-6)
np.testing.assert_allclose(density, FILLVAL_FLOAT32)
np.testing.assert_allclose(temperature, FILLVAL_FLOAT32)


def test_optimize_parameters_bad_covariance_handling():
"""Test that the optimize_pseudo_parameters() function
reports speed only when output covariance is nonsensical."""

file_name = "ialirt_test_data_u_sw_550_n_sw_5_T_sw_100000_v2.csv"

calibration_test_file = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/swapi_ialirt_energy_steps.csv"
)
energy_passbands = calibration_test_file["Energy"][0:63].to_numpy().astype(float)

energy_data = pd.read_csv(
f"{imap_module_directory}/tests/ialirt/data/l0/{file_name}"
)
count_rates = energy_data["Count Rates [Hz]"].to_numpy()
count_rates[0] = 0.0
count_rates_errors = energy_data["Count Rates Error [Hz]"].to_numpy()

# setting errors to 0 results in infinite covariance
count_rates_errors *= 0

speed, density, temperature = optimize_pseudo_parameters(
count_rates, count_rates_errors, energy_passbands
)

expected_speed = (
np.sqrt(energy_passbands[count_rates.argmax(axis=-1)]) * Consts.speed_coeff
)

np.testing.assert_allclose(speed, expected_speed, rtol=1e-6)
np.testing.assert_allclose(density, FILLVAL_FLOAT32)
np.testing.assert_allclose(temperature, FILLVAL_FLOAT32)


def test_geometric_mean():
"""Test geometric_mean function."""

Expand Down