Source code for autowisp.fits_utilities
"""General use convenience functions for working with FITS images."""
from os import path
import logging
from numpy import nan
from astropy.io import fits
from autowisp.pipeline_exceptions import BadImageError, ImageMismatchError
from autowisp.data_reduction.data_reduction_file import DataReductionFile
_logger = logging.getLogger(__name__)
[docs]
def read_image_components(
fits_fname,
*,
read_image=True,
read_error=True,
read_mask=True,
read_header=True,
):
"""
Read image, its error estimate, mask and header from pipeline FITS file.
Args:
fits_fname: The filename of the FITS file to read the componets of.
Must have been produced by the pipeline.
read_image: Should the pixel values of the primary image be read.
read_error: Should the error extension be searched for and read.
read_mask: Should the mask extension be searched for and read.
read_header: Should the header of the image extension be returned.
Returns:
(tuple):
2-D array:
The primary image in the file. Always present.
2-D array:
The error estimate of image, identified by
``IMAGETYP=='error'``. Set to None if none of the extensions
have ``IMAGETYP=='error'``. This is omitted from the output if
``read_error == False``.
2-D array:
A bitmask of quality flags for each image pixel (identified by
``IMAGETYP='mask'``). Set to None if none of the extensions have
``IMAGETYP='mask'``. This is omitted from the output if
``read_mask == False``.
astropy.io.fits.Header:
The header of the image HDU in the file. This is omitted from
the output if ``read_header == False``."""
image = error = mask = header = None
with fits.open(fits_fname, mode="readonly") as input_file:
for hdu_index, hdu in enumerate(input_file):
if hdu.header["NAXIS"] == 0:
continue
if image is None:
image = hdu.data if read_image else True
if read_header:
header = hdu.header
else:
if hdu.header["IMAGETYP"] == "error":
error = hdu.data
elif hdu.header["IMAGETYP"] == "mask":
mask = hdu.data
if mask.dtype.itemsize != 1:
raise BadImageError(
f"Mask image (hdu #{hdu_index:d}) of {fits_fname} "
f"had data type {mask.dtype!s} (not int8)."
)
if (
image is not None
and (error is not None or not read_error)
and (mask is not None or not read_mask)
):
break
return (
((image,) if read_image else ())
+ ((error,) if read_error else ())
+ ((mask,) if read_mask else ())
+ ((header,) if read_header else ())
)
[docs]
def get_primary_header(fits_image, add_filename_keywords=False):
"""
Return the primary header of the given image (filename or opened).
Args:
fits_image: Either the filename or open FITS image to get the primary
header of.
add_filename_keywords: If True appends to the header keywords parsed
from the filename.
Returns:
fits.Header:
The first header in the input file with non-zero NAXIS.
"""
if not isinstance(fits_image, fits.HDUList):
try:
with fits.open(fits_image, "readonly") as opened_fits:
return get_primary_header(opened_fits, add_filename_keywords)
except OSError:
with DataReductionFile(fits_image, "r") as dr_file:
return dr_file.get_frame_header()
for hdu in fits_image:
if hdu.header["NAXIS"] != 0 or hdu.header.get("IMAGETYP") == "mphotref":
result = hdu.header
if add_filename_keywords:
result = result.copy()
base_fname = path.basename(fits_image.fileinfo(0)["filename"])
for ext in [".fz", ".fits"]:
if base_fname.endswith(ext):
base_fname = base_fname[: -len(ext)]
result["RAWFNAME"] = base_fname
return result
raise IOError(f"No valid HDU found in {fits_image!r}!")
[docs]
def update_stack_header(master_header, frame_header, filename, first_time):
"""
Update the master header per header from one of the individual frames.
Should be called once for each frame participating in the stack with the
second argument being the header of that frame. The first argument
should initially be an empty FITS header, which will get updated each
time.
Args:
master_header: The header to use for the stacked master frame,
describing the frames being stacked. On exit contains only
keywords shared with frame header and only if their
corresponding values match.
frame_header: The header of an individual frame being added to
the stack.
filename: The filename where the header was read from. Only used
for reporting errors.
first_time: Is this the first time this function is called for
the current stack? Subsequent calls remove keywords that are
discrepant between current header and the new frame being
stacked.
Returns:
None
"""
if first_time:
master_header.extend(
filter(lambda c: tuple(c) != ("", "", ""), frame_header.cards)
)
else:
_logger.debug("Checking master header against %s", filename)
mismatch_keys = {
k
for k in master_header
if master_header[k] != frame_header.get(k, nan)
} | (set(master_header) - set(frame_header))
if "IMAGEYP" in mismatch_keys:
raise ImageMismatchError(
"Attempting to combine images with "
f"IMAGETYP = {master_header['IMAGETYP']} and "
f"IMAGETYP={frame_header['IMAGETYP']} "
"into a master!"
)
_logger.debug("Deleting:\n%s", "\n".join(mismatch_keys))
_logger.debug("Starting with %d cards", len(master_header.cards))
for key in mismatch_keys:
master_header.remove(key, remove_all=True)
_logger.debug("%d cards remain", len(master_header.cards))