Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
187 changes: 138 additions & 49 deletions src/autoprof/autoprofutils/SharedFunctions.py
Original file line number Diff line number Diff line change
Expand Up @@ -473,6 +473,49 @@ def interpolate_bicubic(dat, X, Y):
return f_interp(Y, X, grid=False)


def _cubic_lagrange_weights(t):
return np.array(
[
-t * (t - 1) * (t - 2) / 6,
(t + 1) * (t - 1) * (t - 2) / 2,
-(t + 1) * t * (t - 2) / 2,
(t + 1) * t * (t - 1) / 6,
]
)


def _interpolate_bicubic_local(dat, X, Y, mask=None):
flux = np.full(len(X), np.nan)
valid = np.ones(len(X), dtype=bool)

for i in range(len(X)):
if not np.isfinite(X[i]) or not np.isfinite(Y[i]):
valid[i] = False
continue

xbase = int(np.floor(X[i]))
ybase = int(np.floor(Y[i]))
x0, x1 = xbase - 1, xbase + 3
y0, y1 = ybase - 1, ybase + 3
if x0 < 0 or y0 < 0 or x1 > dat.shape[1] or y1 > dat.shape[0]:
valid[i] = False
continue

chunk = dat[y0:y1, x0:x1]
if not np.all(np.isfinite(chunk)):
valid[i] = False
continue
if mask is not None and np.any(mask[y0:y1, x0:x1]):
valid[i] = False
continue

wx = _cubic_lagrange_weights(X[i] - xbase)
wy = _cubic_lagrange_weights(Y[i] - ybase)
flux[i] = np.sum(chunk * wy.reshape((-1, 1)) * wx)

return flux, valid


def interpolate_Lanczos(dat, X, Y, scale):
"""
Perform Lanczos interpolation on an image.
Expand Down Expand Up @@ -517,6 +560,30 @@ def interpolate_Lanczos(dat, X, Y, scale):
return np.array(flux)


def _interpolation_footprint_valid(dat, X, Y, mask=None, interp_method="lanczos", interp_window=5):
valid = np.ones(len(X), dtype=bool)
for i in range(len(X)):
if interp_method == "lanczos":
x0 = int(round(np.floor(X[i]) - interp_window + 1))
x1 = int(round(np.floor(X[i]) + interp_window + 1))
y0 = int(round(np.floor(Y[i]) - interp_window + 1))
y1 = int(round(np.floor(Y[i]) + interp_window + 1))
else:
raise ValueError(
"Unknown interpolate method %s. Should be lanczos" % interp_method
)
if x0 < 0 or y0 < 0 or x1 > dat.shape[1] or y1 > dat.shape[0]:
valid[i] = False
continue
chunk = dat[y0:y1, x0:x1]
if chunk.size == 0 or not np.all(np.isfinite(chunk)):
valid[i] = False
continue
if mask is not None and np.any(mask[y0:y1, x0:x1]):
valid[i] = False
return valid


def _iso_between(
IMG,
sma_low,
Expand Down Expand Up @@ -560,38 +627,32 @@ def _iso_between(
RR /= SuperEllipse_Rscale * Fmode_Rscale
rselect = np.logical_and(RR < sma_high, RR > sma_low)
fluxes = IMG[ranges[1][0] : ranges[1][1], ranges[0][0] : ranges[0][1]][rselect]
CHOOSE = None
if not mask is None and sma_high > 5:
CHOOSE = np.logical_not(
mask[ranges[1][0] : ranges[1][1], ranges[0][0] : ranges[0][1]][rselect]
theta_values = theta[rselect]
CHOOSE = np.isfinite(fluxes)
if not mask is None:
CHOOSE = np.logical_and(
CHOOSE,
np.logical_not(
mask[ranges[1][0] : ranges[1][1], ranges[0][0] : ranges[0][1]][rselect]
),
)
# Perform sigma clipping if requested
if sigmaclip:
sclim = Sigma_Clip_Upper(fluxes, sclip_iterations, sclip_nsigma)
if CHOOSE is None:
CHOOSE = fluxes < sclim
else:
CHOOSE = np.logical_or(CHOOSE, fluxes < sclim)
if CHOOSE is not None and np.sum(CHOOSE) < 5:
if np.any(CHOOSE):
sclim = Sigma_Clip_Upper(fluxes[CHOOSE], sclip_iterations, sclip_nsigma)
clipped_choose = np.logical_and(CHOOSE, fluxes < sclim)
if np.any(clipped_choose):
CHOOSE = clipped_choose
countmasked = np.sum(np.logical_not(CHOOSE))
if np.sum(CHOOSE) < 5:
logging.warning(
"Entire Isophote is Masked! R_l: %.3f, R_h: %.3f, PA: %.3f, ellip: %.3f"
"Too few usable pixels in isophote band! R_l: %.3f, R_h: %.3f, PA: %.3f, ellip: %.3f"
% (sma_low, sma_high, PARAMS["pa"] * 180 / np.pi, PARAMS["ellip"])
)
CHOOSE = np.ones(CHOOSE.shape).astype(bool)
if CHOOSE is not None:
countmasked = np.sum(np.logical_not(CHOOSE))
else:
countmasked = 0
if more:
if CHOOSE is not None and sma_high > 5:
return fluxes[CHOOSE], theta[rselect][CHOOSE], countmasked
else:
return fluxes, theta[rselect], countmasked
return fluxes[CHOOSE], theta_values[CHOOSE], countmasked
else:
if CHOOSE is not None and sma_high > 5:
return fluxes[CHOOSE]
else:
return fluxes
return fluxes[CHOOSE]


def _iso_extract(
Expand Down Expand Up @@ -651,55 +712,64 @@ def _iso_extract(
theta = theta[BORDER]

Rlim = np.max(R)
footprint_choose = None
if Rlim < rad_interp:
box = [
[max(0, int(c["x"] - Rlim - 5)), min(IMG.shape[1], int(c["x"] + Rlim + 5))],
[max(0, int(c["y"] - Rlim - 5)), min(IMG.shape[0], int(c["y"] + Rlim + 5))],
]
if interp_method == "bicubic":
flux = interpolate_bicubic(
IMG[box[1][0] : box[1][1], box[0][0] : box[0][1]],
X - box[0][0],
Y - box[1][0],
)
flux, footprint_choose = _interpolate_bicubic_local(IMG, X, Y, mask=mask)
elif interp_method == "lanczos":
flux = interpolate_Lanczos(IMG, X, Y, interp_window)
footprint_choose = _interpolation_footprint_valid(
IMG,
X,
Y,
mask=mask,
interp_method=interp_method,
interp_window=interp_window,
)
else:
raise ValueError(
"Unknown interpolate method %s. Should be one of lanczos or bicubic" % interp_method
)
else:
# round to integers and sample pixels values
flux = IMG[np.rint(Y).astype(np.int32), np.rint(X).astype(np.int32)]
# CHOOSE holds bolean array for which flux values to keep, initialized as None for no clipping
CHOOSE = None
# CHOOSE holds bolean array for which flux values to keep.
# Interpolated samples are rejected when their interpolation footprint
# contains masked or non-finite pixels.
finite_flux = np.isfinite(flux)
CHOOSE = finite_flux.copy()
# Mask pixels if a mask is given
if not mask is None:
CHOOSE = np.logical_not(mask[np.rint(Y).astype(np.int32), np.rint(X).astype(np.int32)])
CHOOSE = np.logical_and(
CHOOSE, np.logical_not(mask[np.rint(Y).astype(np.int32), np.rint(X).astype(np.int32)])
)
if footprint_choose is not None:
CHOOSE = np.logical_and(CHOOSE, footprint_choose)
# Perform sigma clipping if requested
if sigmaclip:
sclim = Sigma_Clip_Upper(flux, sclip_iterations, sclip_nsigma)
if CHOOSE is None:
CHOOSE = flux < sclim
else:
CHOOSE = np.logical_or(CHOOSE, flux < sclim)
if np.any(CHOOSE):
sclim = Sigma_Clip_Upper(flux[CHOOSE], sclip_iterations, sclip_nsigma)
clipped_choose = np.logical_and(CHOOSE, flux < sclim)
if np.any(clipped_choose):
CHOOSE = clipped_choose
# Dont clip pixels if that removes all of the pixels
countmasked = 0
if not CHOOSE is None and np.sum(CHOOSE) <= 0:
countmasked = np.sum(np.logical_not(CHOOSE))
if np.sum(CHOOSE) <= 0:
logging.warning(
"Entire Isophote is Masked! R: %.3f, PA: %.3f, ellip: %.3f"
% (sma, PARAMS["pa"] * 180 / np.pi, PARAMS["ellip"])
)
flux = flux[CHOOSE]
theta = theta[CHOOSE]
# Interpolate clipped flux values if requested
elif not CHOOSE is None and interp_mask:
elif interp_mask:
flux[np.logical_not(CHOOSE)] = np.interp(
theta[np.logical_not(CHOOSE)], theta[CHOOSE], flux[CHOOSE], period=2 * np.pi
)
# simply remove all clipped pixels if user doesn't reqest another option
elif not CHOOSE is None:
elif not np.all(CHOOSE):
flux = flux[CHOOSE]
theta = theta[CHOOSE]
countmasked = np.sum(np.logical_not(CHOOSE))

# Return just the flux values, or flux and angle values
if more:
Expand All @@ -708,7 +778,7 @@ def _iso_extract(
return flux


def _iso_line(IMG, length, width, pa, c, more=False):
def _iso_line(IMG, length, width, pa, c, more=False, mask=None):

start = np.array([c["x"], c["y"]])
end = start + length * np.array([np.cos(pa), np.sin(pa)])
Expand All @@ -733,11 +803,21 @@ def _iso_line(IMG, length, width, pa, c, more=False):

lselect = np.logical_and.reduce((XX >= -0.5, XX <= length, np.abs(YY) <= (width / 2)))
flux = IMG[ranges[1][0] : ranges[1][1], ranges[0][0] : ranges[0][1]][lselect]
Xline = XX[lselect]
Yline = YY[lselect]
CHOOSE = np.isfinite(flux)
if mask is not None:
CHOOSE = np.logical_and(
CHOOSE,
np.logical_not(
mask[ranges[1][0] : ranges[1][1], ranges[0][0] : ranges[0][1]][lselect]
),
)

if more:
return flux, XX[lselect], YY[lselect]
return flux[CHOOSE], Xline[CHOOSE], Yline[CHOOSE]
else:
return flux, XX[lselect]
return flux[CHOOSE], Xline[CHOOSE]


def StarFind(
Expand Down Expand Up @@ -970,6 +1050,15 @@ def StarFind(
}


def _photutils_masked_data(dat, results):
mask = np.logical_not(np.isfinite(dat))
if results.get("mask", None) is not None:
mask = np.logical_or(mask, results["mask"])
if np.any(mask):
return np.ma.array(dat, mask=mask)
return dat


def _x_to_pa(x):
"""
Internal, basic function to ensure position angles remain
Expand Down
1 change: 1 addition & 0 deletions src/autoprof/pipeline_steps/Axial_Profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ def Axial_Profiles(IMG, results, options):
"y": results["center"]["y"]
+ ang * rd * pR * np.sin(pa + (0 if ang > 0 else np.pi)),
},
mask=mask,
)
for oi, oR in enumerate(R):
length = (R[oi] - R[oi - 1]) if oi > 0 else 1.0
Expand Down
Loading
Loading