"""Define classes for smoothing images."""
from abc import ABC, abstractmethod
import scipy
import scipy.linalg
import scipy.interpolate
from superphot_pipeline.iterative_rejection_util import\
iterative_rej_linear_leastsq
from superphot_pipeline.image_utilities import zoom_image, bin_image
git_id = '$Id: 9cdb88bef88a8ee0c8bfe6ba40efd5191eda98a6 $'
[docs]class ImageSmoother(ABC):
"""
Define the interface for applying smoothing algorithms to images.
Attrs:
bin_factor: See same name argument to smooth().
zoom_interp_order: See same name argument to smooth().
"""
@abstractmethod
[docs] def _apply_smoothing(self, image, **kwargs):
"""
Return a smooth version of the given image (no pre-shrinking).
Args:
image: The image to smooth.
kwargs: Any arguments configuring how smoothing is to
be performed.
Returns:
smooth_image: The smoothed version of the image per the currently
defined smoothing.
"""
[docs] def __init__(self, **kwargs):
"""
Set default pre-shrink/post-zoom bin factor and interpolation order.
Args:
kwargs: All arguments except bin_factor, interp_order and padding
mode(see smooth()) are ignored.
Returns:
None
"""
self.bin_factor = kwargs.get('bin_factor', None)
self.zoom_interp_order = kwargs.get('zoom_interp_order', None)
self.padding_mode = kwargs.get('padding_mode', None)
[docs] def smooth(self,
image,
*,
bin_factor=None,
zoom_interp_order=None,
padding_mode='reflect',
**kwargs):
"""
Sandwich smoothing between initial binning and final zoom.
Args:
image: The image to smooth.
bin_factor: The factor to pre-bin the image by (see
bin_image() for details). After smoothing the image is zoomed by
the same factor to recover the original size. If None, the value
defined at construction is used, otherwise this value applies
for this image only.
interp_order: The interpolation order to use when zooming the
image at the end. If None, the interpolation order defined at
construction is used, otherwise this value applies for this
image only.
padding_mode: How to pad the image to have an integer number of
bins, if pre-binning and post-zooming is used.
kwargs: Any arguments configuring how smoothing is to
be performed.
Returns:
smooth_image: The smoothed version of the image.
"""
if bin_factor is None:
bin_factor = self.bin_factor
if zoom_interp_order is None:
zoom_interp_order = self.zoom_interp_order
if bin_factor is None or zoom_interp_order is None:
return self._apply_smoothing(image, **kwargs)
y_res, x_res = image.shape
if x_res % bin_factor != 0 or y_res % bin_factor != 0:
y_padding = int(scipy.ceil(y_res / bin_factor)) * bin_factor - y_res
x_padding = int(scipy.ceil(x_res / bin_factor)) * bin_factor - x_res
left_padding = x_padding // 2
bottom_padding = y_padding // 2
image = scipy.pad(image,
((bottom_padding, y_padding - bottom_padding),
(left_padding, x_padding - left_padding)),
mode=(padding_mode or self.padding_mode))
smooth_image = zoom_image(
self._apply_smoothing(
bin_image(image, bin_factor),
**kwargs
),
bin_factor,
zoom_interp_order
)
if x_res % bin_factor == 0 and y_res % bin_factor == 0:
return smooth_image
return smooth_image[bottom_padding : bottom_padding + y_res,
left_padding : left_padding + x_res]
[docs] def detrend(self, image, **kwargs):
"""De-trend the input image by its smooth version (see smooth)."""
smooth_image = self.smooth(image, **kwargs)
return image / smooth_image * scipy.mean(smooth_image)
[docs]class SeparableLinearImageSmoother(ImageSmoother):
"""
Handle smoothing function = product of linear functions in each dimension.
In more detail this is a base class that perform smoothing with a smoothing
function consisting of the product of separate smoothing functions in x and
y, each of which predicts pixel values as a linear combination of some
parameters.
Attrs:
num_x_terms: The number of terms in the smoothing function in the x
direction.
num_y_terms: The number of terms in the smoothing function in the y
direction.
"""
@abstractmethod
[docs] def get_x_pixel_integrals(self, param_ind, x_resolution):
"""
Return the x smoothing func. integral over pixels for 1 nonzero param.
Args:
param_ind: The index of the input parameter which is non-zero.
x_resolution: The resolution of the input image in the
x direction.
Returns:
integrals: A 1-D scipy array with length equal to the
x-resolution of the image with the i-th entry being the integral
of the x part of the smoothing function the i-th pixel.
"""
@abstractmethod
[docs] def get_y_pixel_integrals(self, param_ind, y_resolution):
"""See get_x_pixel_integrals."""
[docs] def _get_smoothing_matrix(self,
num_x_terms,
num_y_terms,
y_resolution,
x_resolution):
"""
Return matrix giving flattened smooth image when applied to fit params.
Args:
num_x_terms: See same name argument to _apply_smoothing().
num_y_terms: See same name argument to _apply_smoothing().
x_resolution: The number of image columns.
y_resolution: The number of image rows.
Returns:
matrix: An (x_res * y_res) by (num_x_terms * num_y_terms) matrix
which when applied to a set of parameters returns the value of
each image pixel per the smoothing function.
"""
matrix = scipy.empty((x_resolution * y_resolution,
num_x_terms * num_y_terms))
for x_term_index in range(num_x_terms):
x_integrals = self.get_x_pixel_integrals(x_term_index, x_resolution)
for y_term_index in range(num_y_terms):
y_integrals = self.get_y_pixel_integrals(y_term_index,
y_resolution)
matrix[
:,
x_term_index + y_term_index * num_x_terms
] = scipy.outer(y_integrals, x_integrals).flatten()
return matrix
[docs] def __init__(self,
*,
num_x_terms=None,
num_y_terms=None,
outlier_threshold=None,
max_iterations=None,
**kwargs):
"""
Define the default smoothing configuration (overwritable on use).
Args:
num_x_terms: See same name argument to _apply_smoothing().
num_y_terms: See same name argument to _apply_smoothing().
outlier_threshold: See same name argument to _apply_smoothing().
max_iterations: See same name argument to _apply_smoothing().
kwargs: Any arguments to pass to parent's __init__.
Returns:
None
"""
super().__init__(**kwargs)
self.num_x_terms = num_x_terms
self.num_y_terms = num_y_terms
self.outlier_threshold = outlier_threshold
self.max_iterations = max_iterations
#The abstract method was deliberately defined wit flexible arguments
#pylint: disable=arguments-differ
[docs] def _apply_smoothing(self,
image,
*,
num_x_terms=None,
num_y_terms=None,
outlier_threshold=None,
max_iterations=None):
"""
Return a smooth version of the given image.
Args:
image: The image to smooth.
num_x_terms: The number of parameters of the x
smoothing function.
num_y_terms: The number of parameters of the y
smoothing function.
outlier_threshold: The threshold for discarding pixel values as
being outliers. See same name argument
to iterative_rej_linear_leastsq().
max_iterations: The maximum number of reject/re-fit iterations
allowed. See same name argument
to iterative_rej_linear_leastsq().
Returns:
smooth_image: The best approximation of the input image possible
with the smoothing function.
residual: The root mean square residual returned
by iterative_rej_linear_leastsq()
"""
if outlier_threshold is None:
outlier_threshold = self.outlier_threshold
if max_iterations is None:
max_iterations = self.max_iterations
if num_x_terms is None:
num_x_terms = self.num_x_terms
if num_y_terms is None:
num_y_terms = self.num_y_terms
matrix = self._get_smoothing_matrix(num_x_terms,
num_y_terms,
*image.shape)
fit_coef = iterative_rej_linear_leastsq(
matrix,
image.flatten(),
outlier_threshold=outlier_threshold,
max_iterations=max_iterations
)[0]
smooth_image = matrix.dot(fit_coef).reshape(image.shape)
return smooth_image
#pylint: enable=arguments-differ
[docs]class PolynomialImageSmoother(SeparableLinearImageSmoother):
"""
Smooth image is modeled as a polynomial in x times another polynomial in y.
"""
@staticmethod
[docs] def _get_powerlaw_pixel_integrals(power, resolution):
"""
Return the integrals over one pixel dimension of x^power for each pixel.
Args:
power: The power of the corresponding coordinate of the term we
are integrating.
resolution: The resolution of the image in the dimension in which to
calculate the pixel integrlas.
Returns:
integrals: A 1-D scipy array with the i-th entry being the
integral of the scaled x coordinate to the given power over
each pixel.
"""
pix_left = scipy.arange(resolution)
return (
(2.0 * (pix_left + 1) / resolution - 1)**(power + 1)
-
(2.0 * pix_left / resolution - 1)**(power + 1)
) / (power + 1)
get_x_pixel_integrals = _get_powerlaw_pixel_integrals
get_y_pixel_integrals = _get_powerlaw_pixel_integrals
[docs]class SplineImageSmoother(SeparableLinearImageSmoother):
"""Smooth image is modeled as a product of cubic splines in x and y."""
@staticmethod
[docs] def get_spline_pixel_integrals(node_index,
resolution,
num_nodes,
spline_degree):
"""
Return the integrals over one pixel dimension of a basis spline.
The spline basis functions are defined as an interpolating spline (i.e.
no smoothing) over y values that are one for the `node_index`-th node
and zero everywhere else.
Args:
node_index: The node at which the spline should evaluate to
one. All other nodes get a value of zero.
resolution: The resolution of the image in the dimension in which
to calculate the pixel integrlas.
num_nodes: The number of nodes in the spline. The image is scaled
to have a dimension of `num_nodes - 1` and nodes are set at
integer values.
spline_degree: The degree of the spline to use.
Returns:
integrals: A 1-D scipy array with the i-th entry being the
integral of the spline over the i-th pixel.
"""
interp_y = scipy.zeros(num_nodes)
interp_y[node_index] = 1.0
integrate = scipy.interpolate.InterpolatedUnivariateSpline(
scipy.arange(num_nodes),
interp_y,
k=spline_degree
).antiderivative()
cumulative_integrals = integrate(scipy.arange(resolution + 1)
*
((num_nodes - 1) / resolution))
return cumulative_integrals[1:] - cumulative_integrals[:-1]
[docs] def get_x_pixel_integrals(self, param_ind, x_resolution):
"""
Return integrals of the param_ind-th x direction spline basis function.
Args: See SeparableLinearImageSmoother.get_x_pixel_integrals()
Returns:
See SeparableLinearImageSmoother.get_x_pixel_integrals()
"""
return self.get_spline_pixel_integrals(param_ind,
x_resolution,
self.num_x_nodes,
self.spline_degree)
[docs] def get_y_pixel_integrals(self, param_ind, y_resolution):
"""
Return integrals of the param_ind-th y direction spline basis function.
Args: See SeparableLinearImageSmoother.get_y_pixel_integrals()
Returns:
See SeparableLinearImageSmoother.get_y_pixel_integrals()
"""
return self.get_spline_pixel_integrals(param_ind,
y_resolution,
self.num_y_nodes,
self.spline_degree)
[docs] def __init__(self,
*,
num_x_nodes=None,
num_y_nodes=None,
spline_degree=3,
**kwargs):
"""
Set-up spline interpolation with the given number of nodes.
Args:
num_x_nodes: The number of spline nodes for the x spline.
num_y_nodes: The number of spline nodes for the y spline.
kwargs: Forwarded directly
to SeparableLinearImageSmoother.__init__() after
`num_x_terms = num_x_nodes` and `num_y_terms = num_y_nodes`.
Returns:
None
"""
super().__init__(num_x_terms=num_x_nodes,
num_y_terms=num_y_nodes,
**kwargs)
self.num_x_nodes = num_x_nodes
self.num_y_nodes = num_y_nodes
self.spline_degree = spline_degree
#Different parameters are deliberate
#pylint: disable=arguments-differ
[docs] def _apply_smoothing(self,
image,
*,
num_x_nodes=None,
num_y_nodes=None,
**kwargs):
"""
Handle change in interpolation nodes needed by integrals functions.
Args: See SeparableLinearImageSmoother._apply_smoothing() except the
names of num_x_terms and num_y_terms have been changed to
num_x_nodes and num_y_nodes respectively.
Returns:
None
"""
if num_x_nodes is not None:
self.num_x_nodes = num_x_nodes
if num_y_nodes is not None:
self.num_y_nodes = num_y_nodes
return super()._apply_smoothing(image,
num_x_terms=num_x_nodes,
num_y_terms=num_y_nodes,
**kwargs)
#pylint: enable=arguments-differ
[docs]class WrapFilterAsSmoother(ImageSmoother):
"""Wrap one of the scipy or astropy image filters in a smoother."""
[docs] def __init__(self,
smoothing_filter,
*,
bin_factor=None,
zoom_interp_order=None,
**filter_config):
"""
Apply the given filter through the ImageSmoother interface.
Args:
smoothing_filter: The filter to apply to smooth images.
bin_factor: See ImageSmoother.smooth().
zoom_interp_order: See ImageSmoother.smooth().
filter_config: Any arguments to pass to the filter when applying,
unless overwritten by arguments to smooth() or detrend().
Returns:
None
"""
super().__init__(bin_factor=bin_factor,
zoom_interp_order=zoom_interp_order)
self.filter_config = filter_config
self.filter = smoothing_filter
[docs] def _apply_smoothing(self, image, **kwargs):
"""
Smooth the given image with the filter supplied on construction.
Args:
image: The image to smooth.
kwargs: Additional or replacement arguments to pass to the filter
when applying.
"""
filter_kwargs = self.filter_config
filter_kwargs.update(kwargs)
return self.filter(image, **filter_kwargs)
[docs]class ChainSmoother(ImageSmoother):
"""
Combine more than one smoothers, each is applied sequentially.
Works much like a list of smoothers, except it adds smooth and detrend
methods.
Attrs:
smoothing_chain: The current list of smoothers and the order in which
they are applied. The first will be applied to the input image, the
second will be applied to the result of the first etc.
"""
[docs] def __init__(self, *smoothers, **kwargs):
"""
Create a chain combining the given smoothers in the given order.
Args:
smoothers: A list of the image smoothers to combine.
kwargs: Any arguments to pass to parent constructor.
"""
super().__init__(**kwargs)
self.smoothing_chain = []
self.extend(smoothers)
[docs] def append(self, smoother):
"""Add a new smoother to the end of the sequence."""
assert isinstance(smoother, ImageSmoother)
self.smoothing_chain.append(smoother)
[docs] def extend(self, smoothers):
"""Add multiple smoothers to the end of the chain."""
if isinstance(smoothers, ChainSmoother):
self.smoothing_chain.extend(smoothers.smoothing_chain)
else:
for smth in smoothers:
assert isinstance(smth, ImageSmoother)
self.smoothing_chain.extend(smoothers)
[docs] def insert(self, position, smoother):
"""Like list insert."""
assert isinstance(smoother, ImageSmoother)
self.smoothing_chain.insert(position, smoother)
[docs] def remove(self, smoother):
"""Like list remove."""
self.smoothing_chain.remove(smoother)
[docs] def pop(self, position=-1):
"""Like list pop."""
self.smoothing_chain.pop(position)
[docs] def clear(self):
"""Like list clear."""
self.smoothing_chain.clear()
[docs] def __delitem__(self, position):
"""Delete smoothe at position."""
del self.smoothing_chain[position]
[docs] def __setitem__(self, position, smoother):
"""Replace the smoother at position."""
self.smoothing_chain[position] = smoother
#It makes no sense to take configuration argumens.
#pylint: disable=arguments-differ
[docs] def _apply_smoothing(self, image):
"""Smooth the given image using the current chain of smoothers."""
smooth_image = image
for smoother in self.smoothing_chain:
smooth_image = smoother.smooth(smooth_image)
return smooth_image
#pylint: enable=arguments-differ