"""A collection of functions for working with pipeline images."""
import os.path
import logging
from astropy.io import fits
from astropy.coordinates import SkyCoord
from astropy.visualization import ZScaleInterval
from PIL import Image
import numpy
import scipy.interpolate
from astrowisp.utils.file_utilities import (
prepare_file_output,
get_fname_pattern_substitutions,
)
from autowisp.pipeline_exceptions import BadImageError
_logger = logging.getLogger(__name__)
git_id = "$Id: 688e202458f7084d6c407c222ef25a2570babe21 $"
# pylint: disable=anomalous-backslash-in-string
# Triggers on doxygen commands.
[docs]
def zoom_image(image, zoom, interp_order):
"""
Increase the resolution of an image using flux conserving interpolation.
Interpolation is performed using the following recipe:
1. create a cumulative image (C), i.e. C(x, y) = sum(
image(x', y'), {x', 0, x}, {y', 0, y}). Note that C's x and y
resolutions are both bigger than image's by one with all entries in
the first row and the first column being zero.
2. Interpolate the cumulative image using a bivariate spline to get a
continuous cumulative flux F(x, y).
3. Create the final image I by setting each pixel to the flux implied
by F(x, y) from step 2, i.e. if zx is the zoom factor along x and zy
is the zoom factor along y::
I(x, y) = F((x+1)/z, (y+1)/z)
- F((x+1)/z, y/z)
- F(x/z, (y+1)/z)
+ F(x/z, y/z)
Since this is a flux conserving method, zooming and then binning an image
reproduces the original image with close to machine precision.
Args:
image: The image to zoom.
zoom: The factor(s) by which to zoom the image. Should be either an
integer defining a common zoom factor both dimensions or a pair of
numbers, specifying the zoom along each axis (y first, then x).
interp_order: The order of the interpolation of the cumulative array.
Returns:
2-D array:
The zoomed image.
"""
try:
x_zoom, y_zoom = zoom
except TypeError:
x_zoom = y_zoom = zoom
if x_zoom == y_zoom == 1:
return image
y_res, x_res = image.shape
# False positive
# pylint: disable=no-member
cumulative_image = numpy.empty((y_res + 1, x_res + 1))
# pylint: enable=no-member
cumulative_image[0, :] = 0
cumulative_image[:, 0] = 0
# False positive
# pylint: disable=no-member
cumulative_image[1:, 1:] = numpy.cumsum(numpy.cumsum(image, axis=0), axis=1)
# pylint: enable=no-member
try:
spline_kx, spline_ky = interp_order
except TypeError:
spline_kx = spline_ky = interp_order
cumulative_flux = scipy.interpolate.RectBivariateSpline(
# False positive
# pylint: disable=no-member
numpy.arange(y_res + 1),
numpy.arange(x_res + 1),
# pylint: enable=no-member
cumulative_image,
kx=spline_kx,
ky=spline_ky,
)
cumulative_image = cumulative_flux(
# False positive
# pylint: disable=no-member
numpy.arange(y_res * y_zoom + 1) / y_zoom,
numpy.arange(x_res * x_zoom + 1) / x_zoom,
# pylint: enable=no-member
grid=True,
)
# False positive
# pylint: disable=no-member
return numpy.diff(numpy.diff(cumulative_image, axis=0), axis=1)
# pylint: enable=no-member
# pylint: enable=anomalous-backslash-in-string
[docs]
def bin_image(image, bin_factor):
"""
Bins the image to a lower resolution (must be exact factor of image shape).
The output pixels are the sum of the pixels in each bin.
Args:
image: The image to bin.
bin_factor: Either a single integer in which case this is the binning
in both directions, or a pair of integers, specifying different
binnin in each direction.
Returns:
2-D array:
The binned image with a resolution decreased by the binning factor
for each axis, which has the same total flux as the input image.
"""
try:
x_bin_factor, y_bin_factor = bin_factor
except TypeError:
x_bin_factor = y_bin_factor = bin_factor
if x_bin_factor == y_bin_factor == 1:
return image
y_res, x_res = image.shape
assert x_res % x_bin_factor == 0
assert y_res % y_bin_factor == 0
return (
image.reshape(
(
y_res // y_bin_factor,
y_bin_factor,
x_res // x_bin_factor,
x_bin_factor,
)
)
.sum(-1)
.sum(1)
)
[docs]
def zscale_image(image_data):
"""Return the given image ZScaled to 8 bits."""
zscale_min, zscale_max = ZScaleInterval().get_limits(image_data)
return (
255
* (
# False positive
# pylint: disable=no-member
numpy.minimum(numpy.maximum(zscale_min, image_data), zscale_max)
# pylint: enable=no-member
- zscale_min
)
/ (zscale_max - zscale_min)
).astype(
# False positive
# pylint: disable=no-member
numpy.uint8
# pylint: enable=no-member
)
[docs]
def create_snapshot(
fits_fname,
snapshot_fname_pattern,
*,
image_index=0,
overwrite=False,
skip_existing=False,
create_directories=True,
):
"""
Create a snapshot (e.g. JPEG image) from a fits file in zscale.
Args:
fits_fname(str): The FITS image to create a snapshot of.
snapshot_fname_pattern(str): A %-substitution pattern that when
filled using the header and the extra keyword FITS_ROOT (set to the
filename of the FITS file with path and extension removed) expands
to the filename to save the snapshot as.
image_index(int): Offset from the first non-empty HDU in the FITS
file to make a snapshot of.
overwrite(bool): If a file called `snapshot_fname` already exists, an
`OSError` is raised if this argument and `skip_existing` are both
False (default). That file is overwritten if this argument is True
and `skip_existing` is False.
skip_existing(bool): If True and a file already exists with the name
determined for the snapshot, this function exists immediately
without error.
create_directories(bool): Whether the script is allowed to create
the directories where the output snapshot will be stored. `OSError`
is raised if this argument is False and the destination directory
does not exist.
Returns:
None
"""
with fits.open(fits_fname, "readonly") as fits_image:
# False positive
# pylint: disable=no-member
fits_hdu = fits_image[
image_index if fits_image[0].header["NAXIS"] else image_index + 1
]
# pylint: enable=no-member
snapshot_fname = (
snapshot_fname_pattern
% get_fname_pattern_substitutions(fits_fname, fits_hdu.header)
)
snapshot_exists = prepare_file_output(
snapshot_fname,
allow_existing=overwrite,
allow_dir_creation=create_directories,
delete_existing=overwrite,
)
if snapshot_exists and skip_existing:
_logger.info(
"Snapshot %s already exists, skipping!", repr(snapshot_fname)
)
return
scaled_data = zscale_image(fits_hdu.data)
Image.fromarray(scaled_data[::-1, :], "L").save(snapshot_fname)
_logger.debug("Creating snapshot: %s", repr(snapshot_fname))