"""A collection of functions for working with pipeline images."""
from os.path import exists
from astropy.io import fits
from astropy.coordinates import SkyCoord
import scipy
import scipy.interpolate
from superphot_pipeline.pipeline_exceptions import BadImageError
git_id = '$Id: e26feed8afe1e677961590c79bd98954611331bd $'
[docs]def read_image_components(fits_fname,
*,
read_image=True,
read_error=True,
read_mask=True,
read_header=True):
"""
Read image, its error estimate, mask and header from pipeline FITS file.
Args:
fits_fname: The filename of the FITS file to read the componets of.
Must have been produced by the pipeline.
read_image: Should the pixel values of the primary image be read.
read_error: Should the error extension be searched for and read.
read_mask: Should the mask extension be searched for and read.
read_header: Should the header of the image extension be returned.
Returns:
image: The primary image in the file. Always present.
error: The error estimate of image, identified by IMAGETYP=='error'.
Set to None if none of the extensions have IMAGETYP=='error'. This
is omitted from the output if `read_error == False`.
mask: A bitmask of quality flags for each image pixel (identified
by IMAGETYP='mask'). Set to None if none of the extensions
have IMAGETYP='mask'. This is omitted from the output if
`read_mask == False`.
header: The header of the image HDU in the file. This is omitted from
the output if `read_header == False`.
"""
image = error = mask = header = None
with fits.open(fits_fname, mode='readonly') as input_file:
for hdu_index, hdu in enumerate(input_file):
if hdu.header['NAXIS'] == 0:
continue
if image is None:
image = hdu.data if read_image else True
if read_header:
header = hdu.header
else:
if hdu.header['IMAGETYP'] == 'error':
error = hdu.data
elif hdu.header['IMAGETYP'] == 'mask':
mask = hdu.data
if mask.dtype.itemsize != 1:
raise BadImageError(
(
'Mask image (hdu #%d) of %s had data type %s '
'(not int8)'
)
%
(hdu_index, fits_fname, mask.dtype)
)
if (
image is not None
and
(error is not None or not read_error)
and
(mask is not None or not read_mask)
):
break
return (
((image,) if read_image else ())
+
((error,) if read_error else ())
+
((mask,) if read_mask else())
+
((header,) if read_header else())
)
#pylint: disable=anomalous-backslash-in-string
#Triggers on doxygen commands.
[docs]def zoom_image(image, zoom, interp_order):
"""
Increase the resolution of an image using flux conserving interpolation.
Interpolation is performed using the following recipe:
1. create a cumulative image (C), i.e. C(x, y) = sum(
image(x', y'), {x', 0, x}, {y', 0, y}). Note that C's x and y
resolutions are both bigger than image's by one with all entries in
the first row and the first column being zero.
2. Interpolate the cumulative image using a bivariate spline to get a
continuous cumulative flux F(x, y).
3. Create the final image I by setting each pixel to the flux implied
by F(x, y) from step 2, i.e. if zx is the zoom factor along x and zy
is the zoom factor along y::
I(x, y) = F((x+1)/z, (y+1)/z)
- F((x+1)/z, y/z)
- F(x/z, (y+1)/z)
+ F(x/z, y/z)
Since this is a flux conserving method, zooming and then binning an image
reproduces the original image with close to machine precision.
Args:
image: The image to zoom.
zoom: The factor(s) by which to zoom the image. Should be either an
integer defining a common zoom factor both dimensions or a pair of
numbers, specifying the zoom along each axis (y first, then x).
interp_order: The order of the interpolation of the cumulative array.
"""
try:
x_zoom, y_zoom = zoom
except TypeError:
x_zoom = y_zoom = zoom
if x_zoom == y_zoom == 1:
return image
y_res, x_res = image.shape
cumulative_image = scipy.empty((y_res + 1, x_res + 1))
cumulative_image[0, :] = 0
cumulative_image[:, 0] = 0
cumulative_image[1:, 1:] = scipy.cumsum(scipy.cumsum(image, axis=0), axis=1)
try:
spline_kx, spline_ky = interp_order
except TypeError:
spline_kx = spline_ky = interp_order
cumulative_flux = scipy.interpolate.RectBivariateSpline(
scipy.arange(y_res + 1),
scipy.arange(x_res + 1),
cumulative_image,
kx=spline_kx,
ky=spline_ky
)
cumulative_image = cumulative_flux(
scipy.arange(y_res * y_zoom + 1) / y_zoom,
scipy.arange(x_res * x_zoom + 1) / x_zoom,
grid=True
)
return scipy.diff(scipy.diff(cumulative_image, axis=0), axis=1)
#pylint: enable=anomalous-backslash-in-string
[docs]def bin_image(image, bin_factor):
"""
Bins the image to a lower resolution (must be exact factor of image shape).
The output pixels are the sum of the pixels in each bin.
Args:
image: The image to bin.
bin_factor: Either a single integer in which case this is the binning
in both directions, or a pair of integers, specifying different
binnin in each direction.
Returns:
binned_image: The binned image with a resolution decreased by the
binning factor for each axis, which has the same total flux as the
input image.
"""
try:
x_bin_factor, y_bin_factor = bin_factor
except TypeError:
x_bin_factor = y_bin_factor = bin_factor
if x_bin_factor == y_bin_factor == 1:
return image
y_res, x_res = image.shape
assert x_res % x_bin_factor == 0
assert y_res % y_bin_factor == 0
return image.reshape((y_res // y_bin_factor,
y_bin_factor,
x_res // x_bin_factor,
x_bin_factor)).sum(-1).sum(1)