import numpy as np
__all__ = ["NSClean", "make_lowpass_filter", "med_abs_deviation", "NSCleanSubarray"]
[docs]
class NSClean:
"""
JWST NIRSpec background modeling and subtraction -AKA "clean" (NSClean).
Fit and remove 1/f noise in NIR detectors in frequency space.
NSClean is the base class for removing residual correlated
read noise from JWST NIRSpec images. It is intended for use
on Level 2a pipeline products, i.e., IRS2 corrected slope
images. All processing is done in detector coordinates, with
arrays transposed and flipped so that the IRS2 "zipper"
appears along the bottom. For most users, NSClean's :meth:`clean`
method automatically transposes and flips the data as
necessary.
Parameters
----------
detector : str
A string selected from ``{'NRS1', 'NRS2'}``
mask : ndarray of bool
The background model is fitted to pixels set to `True`.
Pixels set to `False` are ignored.
fc : float
Critical frequency. This is the 1/2 power point for NSClean's
low pass filter. The units are 1/pixels.
kill_width : float
"Kill width". The low pass filter's cutoff is 1/2 of a cosine.
The filter function goes from 1x to 0x gain over this bandwidth.
The units are 1/pixels.
buffer_sigma : float
Standard deviation of the buffer's Gaussian smooth. This is an
optional 1-dimensional smooth in the spectral dispersion direction
only. If selected, buffing is done after modeling the background
but before subtracting it.
sigrej : float
Standard deviation threshold for flagging statistical outliers,
to exclude them from use in fitting the background model.
weights_kernel_sigma : int
Used to assign weights for MASK mode fitting.
"""
def __init__(
self,
detector,
mask,
fc=1 / (2 * 2048 / 16),
kill_width=1 / (2 * 2048 / 16) / 4,
buffer_sigma=1.5,
sigrej=3.0,
weights_kernel_sigma=32,
):
self.detector = detector
self.mask = mask
self.fc = fc
self.kill_width = kill_width
self.buffer_sigma = buffer_sigma
self.sigrej = sigrej
self.weights_kernel_sigma = weights_kernel_sigma
# Transpose and flip mask to detector coordinates with the IRS2
# zipper running along the bottom as displayed in ds9.
if self.detector == "NRS1":
# NRS1 requires transpose only
self.mask = self.mask.transpose()
else:
# NRS2 requires transpose and flip
self.mask = self.mask.transpose()[::-1]
self.ny = self.mask.shape[0]
self.nx = self.mask.shape[1]
# FFT frequencies
self.rfftfreq = np.fft.rfftfreq(self.ny)
# Extract other necessary information from model_parameters and build the
# low pass filter. This also sets the number of Fourier vectors that
# we need to project out.
self.apodizer = np.array(make_lowpass_filter(self.fc, self.kill_width, self.nx))
self.nvec = np.sum(self.apodizer > 0) # Project out this many frequencies
# Only MASK mode uses a weighted fit. Compute the weights here. The aim is
# to weight by the reciprocal of the local background sample density along
# each line. Roughly approximate the local density, P, using the reciprocal of
# convolution by a Gaussian kernel for now. For now, hard code the kernel. We
# will optimize this later.
_weight = np.zeros((self.ny, self.nx), dtype=np.float32) # Build the kernel here
_x = np.arange(self.nx)
_mu = self.nx // 2 + 1
_sigma = self.weights_kernel_sigma
_weight[self.ny // 2 + 1] = (
np.exp(-((_x - _mu) ** 2) / _sigma**2 / 2) / _sigma / np.sqrt(2 * np.pi)
)
_weight_fft = np.fft.rfft2(np.fft.ifftshift(_weight))
with np.errstate(divide="ignore"):
self.p_matrix = 1 / np.fft.irfft2(
np.fft.rfft2(np.array(self.mask, dtype=np.float32)) * _weight_fft,
(self.ny, self.nx),
)
# Illuminated areas carry no weight
self.p_matrix = np.where(self.mask, self.p_matrix, 0.0)
# Set bad weights to zero
self.p_matrix[~np.isfinite(self.p_matrix)] = 0.0
# Build a 1-dimensional Gaussian kernel for "buffing". Buffing is in the
# dispersion direction only. In detector coordinates, this is axis zero.
# Even though the kernel is 1-dimensional, we must still use a 2-dimensional
# array to represent it. I tried broadcasting a vector, but that made a kernel
# 2048 columns wide (in detector space).
_y = np.arange(self.ny)
_mu = self.nx // 2 + 1
_sigma = self.buffer_sigma # Standard deviation of kernel
_gkern = (
np.exp(-(((_y - _mu) / _sigma) ** 2) / 2) / _sigma / np.sqrt(2 * np.pi)
) # Centered kernel as a vector
gkern = np.zeros((self.ny, self.nx), dtype=np.float32) # 2D kernel template
gkern[:, _mu] = _gkern # Copy in the kernel. Normalization is already correct.
gkern = np.fft.ifftshift(gkern) # Shift for Numpy
# FFT for fast convolution
self.fgkern = np.array(np.fft.rfft2(gkern), dtype=np.complex64)
[docs]
def fit(self, data):
"""
Fit a background model to the supplied frame of data.
Parameters
----------
data : ndarray of float
A NIRSpec image. The image must be in the detector-space
orientation with the IRS2 zipper running along the bottom as
displayed in SAOImage DS9.
Returns
-------
bkg : ndarray of float
The fitted background model.
Notes
-----
Fitting is done line by line because the matrices get very big if one
tries to project out Fourier vectors from the entire 2K x 2K image area.
"""
model = np.zeros((self.ny, self.nx), dtype=np.float32) # Build the model here
for y in np.arange(self.ny)[4:-4]:
# Get data and weights for this line
d = data[y][self.mask[y]] # unmasked (usable) data
# The line below uses a vector to represent a diagonal weight matrix.
# Multiplications by this vector later on may be viewed as
# equivalent formulations to multiplication by diag(p).
p = self.p_matrix[y][self.mask[y]] # Weights
# If none of the pixels in this line is usable (all masked out),
# skip and move on to the next line.
if len(d) == 0:
continue
# Fill statistical outliers with line median. We know that the rolling
# median cleaning technique worked reasonably well, so this is a fast
# justifiable approximation.
_mu = np.median(d) # Robust estimate of mean
_sigma = 1.4826 * np.median(np.abs(d - _mu)) # Robust estimate of standard deviation
# Fill outliers
d = np.where(
np.logical_and(_mu - self.sigrej * _sigma <= d, d <= _mu + self.sigrej * _sigma),
d,
_mu,
)
# Build the Fourier basis matrix for this line
# Must be a column vector to broadcast
m = np.arange(self.nx)[self.mask[y]].reshape((-1, 1))
# Must be a row vector to broadcast. We can optimize
# the code later by putting this into object instantiation
# since it is the same for every line. For now, leave it
# here since the cost is negligible and it may aid
# comprehension.
k = np.arange(self.nvec).reshape((1, -1))
# Build the basis matrix
basis = np.array(
np.exp(2 * np.pi * 1j * m * k / self.nx) / m.shape[0], dtype=np.complex64
)
# Compute the Moore-Penrose inverse of A = P*B.
# $A^+ = (A^H A)^{-1} A^H$
_a = basis * p[:, np.newaxis]
_a_h = np.conjugate(_a.transpose()) # Hermitian transpose of A
pinv_pb = np.matmul(np.linalg.inv(np.matmul(_a_h, _a)), _a_h)
# Solve for the Fourier transform of this line's background samples.
# The way that we have done it, this multiplies the input data by the
# number of samples used for the fit.
rfft = np.zeros(self.nx // 2 + 1, dtype=np.complex64)
rfft[: k.shape[1]] = np.matmul(pinv_pb, p * d)
# Numpy requires that the forward transform multiply
# the data by n. Correct normalization.
rfft *= self.nx / m.shape[0]
# Apodize if necessary
if self.kill_width > 0:
rfft[: self.nvec] *= self.apodizer[: self.nvec]
# Invert the FFT to build the background model for this line
model[y] = np.fft.irfft(rfft, self.nx)
# Done!
return model
[docs]
def clean(self, data, buff=True):
"""
Clean residual noise in NIRSpec images.
"Clean" NIRspec images by fitting and subtracting the
instrumental background. This is intended to improve the
residual correlated noise (vertical banding) that is
sometimes seen. Because the banding is not seen by the
reference pixels, normal ``IRS^2`` processing does not
remove it.
This is an ad-hoc correction. We model the background by
fitting it in Fourier space. There is an option
to "improve" the result by "buffing" in the spectral
dispersion direction. "Buffing" is light smoothing using
a 1-dimensional Gaussian.
Parameters
----------
data : ndarray
The input data. This should be the normal end result of Stage 1 processing.
buff : bool
"Buff" the fitted spectrum by applying a slight Gaussian blur
in the spectral dispersion direction.
Returns
-------
data : ndarray
The data, but with less striping and the background subtracted.
"""
# Transform the data to detector space with the IRS2 zipper running along the bottom.
if self.detector == "NRS2":
# Transpose and flip for NRS2
data = data.transpose()[::-1]
else:
# Transpose (no flip) for NRS1
data = data.transpose()
# Fit the background model
bkg = self.fit(data) # Background model
# Buff, if requested
if buff:
bkg = np.fft.irfft2(np.fft.rfft2(bkg) * self.fgkern, s=bkg.shape)
# Subtract the background model from the data
data -= bkg
# Transform back to DMS space
if self.detector == "NRS2":
data = data[::-1].transpose()
else:
data = data.transpose()
# Done
return data
[docs]
def make_lowpass_filter(f_half_power, w_cutoff, n, d=1.0):
"""
Make a lowpass Fourier filter.
Parameters
----------
f_half_power : float
Half power frequency
w_cutoff : float
Width of cosine cutoff. The response transitions
from 1x to 0x over this range of frequencies
n : int
Number of samples in the timeseries
d : float
Sample spacing (inverse of the sampling rate). Defaults to 1.
Returns
-------
filt : ndarray
Filter array
"""
# Make frequencies vector
freq = np.fft.rfftfreq(n, d=d)
# Build a cosine wave that is appropriately shifted
cos = (1 + np.cos(np.pi * (freq - f_half_power) / w_cutoff + np.pi / 2)) / 2
# Construct low-pass filter with cosine rolloff
filt = np.where(freq <= f_half_power - w_cutoff / 2, 1, cos)
filt = np.where(freq <= f_half_power + w_cutoff / 2, filt, 0)
# Done
return filt
[docs]
def med_abs_deviation(d, median=True):
"""
Compute the median absolute deviation.
Computes the median and the median absolute deviation (MAD). For normally
distributed data, multiply the MAD by 1.4826 to approximate standard deviation.
If ``median=True`` (the default), this returns a tuple containing (median, MAD).
Otherwise, only the MAD is returned.
Parameters
----------
d : ndarray
The input data
median : bool
Return both the median and MAD
Returns
-------
m : float, optional
Median
mad : float
Median absolute deviation
"""
d = d[np.isfinite(d)] # Exclude NaNs
m = np.median(d)
mad = np.median(np.abs(d - m))
if median is True:
return (m, mad)
else:
return mad
[docs]
class NSCleanSubarray:
"""
Background modeling and subtraction for generic JWST near-IR subarrays.
Fit and remove 1/f noise in NIR detector subarrayss in frequency space.
NSCleanSubarray is the base class for removing residual correlated
read noise from generic JWST near-IR Subarray images. It is
intended for use on Level 2a pipeline products, i.e., slope images.
Parameters
----------
data : ndarray of float
The 2D input image data array to be operated on.
mask : ndarray of bool
The background model is fitted to pixels set to `True`.
Pixels set to `False` are ignored.
fc : tuple
Apodizing filter definition. These parameters are tunable. They
happen to work well for NIRSpec BOTS exposures:
1. Unity gain for ``f < fc[0]``
2. Cosine roll-off from ``fc[0]`` to ``fc[1]``
3. Zero gain from ``fc[1]`` to ``fc[2]``
4. Cosine roll-on from ``fc[2]`` to ``fc[3]``
exclude_outliers : bool
Exclude statistical outliers and their nearest neighbors
from the background pixels mask.
weights_kernel_sigma : float
Standard deviation of the 1-dimensional Gaussian kernel
that is used to approximate background sample density. This
is ad-hoc. See the NSClean journal article for more information. The
default for subarrays results in nearly equal weighting of all background
samples.
Notes
-----
NSCleanSubarray works in detector coordinates. Both the data and mask
need to be transposed and flipped so that slow-scan runs from bottom
to top as displayed in SAOImage DS9. The fast scan direction is
required to run from left to right.
"""
# Class variables. These are the same for all instances.
nloh = np.int32(12) # New line overhead in pixels
tpix = np.float32(10.0e-6) # Pixel dwell time in seconds
sigrej = np.float32(4.0) # Standard deviation threshold for flagging
# statistical outliers.
def __init__(
self,
data,
mask,
fc=(1061, 1211, 49943, 49957),
exclude_outliers=True,
weights_kernel_sigma=None,
):
# Definitions
self.data = np.array(data, dtype=np.float32)
self.mask = np.array(mask, dtype=np.bool_)
self.ny = np.int32(data.shape[0]) # Number of pixels in slow scan direction
self.nx = np.int32(data.shape[1]) # Number of pixels in fast scan direction
self.fc = np.array(fc, dtype=np.float32)
self.n = np.int32(self.ny * (self.nx + self.nloh)) # Number of ticks in clocking pattern
self.rfftfreq = np.array(
np.fft.rfftfreq(self.n, self.tpix), dtype=np.float32
) # Fourier frequencies in clocking pattern
# We will weight by the inverse of the local sample density in time. We compute the local
# sample density by convolution using a Gaussian. Define the standard deviation of the
# Gaussian here. This is ad-hoc.
if weights_kernel_sigma is None:
self.weights_kernel_sigma = 1 / ((fc[0] + fc[1]) / 2) / self.tpix / 2 / 4
else:
self.weights_kernel_sigma = weights_kernel_sigma
# The mask potentially contains NaNs. Exclude them.
self.mask[np.isnan(self.data)] = False
# The mask potentially contains statistical outliers.
# Optionally exclude them.
if exclude_outliers is True:
m, s = med_abs_deviation(
self.data[self.mask]
) # Compute median and median absolute deviation
s *= 1.4826 # Convert MAD to std
vmin = m - self.sigrej * s # Minimum value to keep
vmax = m + self.sigrej * s # Maximum value to keep
# Temporarily change NaNs to inf, so that they don't cause problems.
self.data[np.isnan(self.data)] = np.inf
# Flag statistical outliers
bdpx = np.array(
np.where(np.logical_or(self.data < vmin, self.data > vmax), 1, 0), dtype=np.float32
)
self.data[np.isinf(self.data)] = np.nan # Restore NaNs
# We don't need to worry about non-background pixels
bdpx[np.logical_not(self.mask)] = 0
# Also flag 4 nearest neighbors
bdpx = (
bdpx
+ np.roll(bdpx, (+1, 0), axis=(0, 1))
+ np.roll(bdpx, (-1, 0), axis=(0, 1))
+ np.roll(bdpx, (0, +1), axis=(0, 1))
+ np.roll(bdpx, (0, -1), axis=(0, 1))
)
# bdpx now contains the pixels to exclude from the background pixels
# mask. Exclude them.
self.mask[bdpx != 0] = False
self.data -= m # STUB - Median subtract
# Build the apodizing filter. This has unity gain at low frequency to capture 1/f. It
# also has unity gain at Nyquist to capture alternating column noise.
# At mid frequencies, the gain is zero. Unity gain at low frequencies.
self.apodizer = np.zeros(len(self.rfftfreq), dtype=np.float32)
self.apodizer[self.rfftfreq < self.fc[0]] = 1.0
# Cosine roll-off
here = np.logical_and(self.fc[0] <= self.rfftfreq, self.rfftfreq < self.fc[1])
self.apodizer[here] = 0.5 * (
np.cos((np.pi / (self.fc[1] - self.fc[0])) * (self.rfftfreq[here] - self.fc[0])) + 1
)
# Cosine roll-on
here = np.logical_and(self.fc[2] <= self.rfftfreq, self.rfftfreq < self.fc[3])
self.apodizer[here] = 0.5 * (
np.cos((np.pi / (self.fc[3] - self.fc[2])) * (self.rfftfreq[here] - self.fc[2]) + np.pi)
+ 1
)
# Unity gain between f[3] and end
self.apodizer[self.rfftfreq >= self.fc[3]] = 1.0
[docs]
def fit(self, return_fit=False, weight_fit=False):
"""
Fit a background model to the data.
Parameters
----------
return_fit : bool
Return the Fourier transform.
weight_fit : bool
Use weighted least squares as described in the NSClean paper
(see :ref:`nsclean-algo-references`).
`False` by default. For subarrays it is TBD if this is necessary.
Returns
-------
rfft : ndarray
The computed Fourier transform.
"""
# To build the incomplete Fourier matrix, we require the index of each
# clock tick of each valid pixel in the background samples. For consistency with
# numpy's notation, we call this 'm' and require it to be a column vector.
_x = np.arange(self.nx).reshape((1, -1))
_y = np.arange(self.ny).reshape((-1, 1))
m = (_y * (self.nx + self.nloh) + _x)[self.mask].reshape((-1, 1))
# Define which Fourier vectors to fit. For consistency with numpy, call this k.
k = np.arange(len(self.rfftfreq))[self.apodizer > 0.0].reshape((1, -1))
# Build the incomplete Fourier matrix
basis = np.array(np.exp(2 * np.pi * 1j * m * k / self.n) / m.shape[0], dtype=np.complex64)
# Weighted NSClean fitting
if weight_fit:
# Build the weight matrix. Weight by the reciprocal of the local background
# sample density in time. Roughly approximate the local density, P, using
# the reciprocal of convolution by a Gaussian kernel.
_x = np.arange(self.n, dtype=np.float32) # x-values for building a 1-D Gaussian
_mu = np.float32(self.n // 2 + 1) # Center point of Gaussian
_sigma = np.float32(self.weights_kernel_sigma) # Standard deviation of Gaussian
_weight = (
np.exp(-((_x - _mu) ** 2) / _sigma**2 / 2) / _sigma / np.sqrt(2 * np.pi)
) # Build centered Gaussian
_weight_fft = np.fft.rfft(np.fft.ifftshift(_weight)) # Forward FFT
_m = np.hstack(
(self.mask, np.zeros((self.ny, self.nloh), dtype=np.bool_))
).flatten() # Add new line overhead to mask
with np.errstate(divide="ignore"):
p_matrix = 1 / np.fft.irfft(
np.fft.rfft(np.array(_m, dtype=np.float32)) * _weight_fft, self.n
) # Compute weights
# Keep only background samples
p_matrix = p_matrix[_m]
# Set bad weights to zero
p_matrix[~np.isfinite(p_matrix)] = 0.0
# NSClean's weighting requires the Moore-Penrose inverse of A = P*B.
# $A^+ = (A^H A)^{-1} A^H$
# P is diagonal. Hadamard product is most RAM efficient
_a = p_matrix.reshape((-1, 1)) * basis
# Hermitian transpose of A
_a_h = np.conjugate(_a.transpose())
pinv_pb = np.matmul(np.linalg.inv(np.matmul(_a_h, _a)), _a_h)
else:
# Unweighted fit
pinv_b = np.linalg.pinv(basis)
# Solve for the (approximate) Fourier transform of the background samples.
rfft = np.zeros(len(self.rfftfreq), dtype=np.complex64)
if weight_fit is True:
rfft[self.apodizer > 0.0] = np.matmul(
pinv_pb * p_matrix.reshape((1, -1)), self.data[self.mask]
)
else:
rfft[self.apodizer > 0.0] = np.matmul(pinv_b, self.data[self.mask])
# Numpy requires that the forward transform multiply
# the data by n. Correct normalization.
rfft *= self.n / m.shape[0]
# Invert the apodized Fourier transform to build the background model for this integration
self.model = np.fft.irfft(rfft * self.apodizer, self.n).reshape((self.ny, -1))[:, : self.nx]
# Done
if return_fit:
return rfft
[docs]
def clean(self, weight_fit=True, return_model=False):
"""
Clean the data.
Parameters
----------
weight_fit : bool
Use weighted least squares as described in the NSClean paper
(see :ref:`nsclean-algo-references`).
Otherwise, it is a simple unweighted fit.
return_model : bool
Return the fitted model rather than the corrected data?
Default: `False` (return the corrected data, not the model).
Returns
-------
data : ndarray of float
The cleaned data array.
"""
self.fit(weight_fit=weight_fit) # Fit the background model
if return_model:
return self.model
else:
self.data -= self.model # Overwrite data with cleaned data
return self.data