#!/usr/bin/env python3
"""Stack a collection of images to a master frame."""
from functools import partial
from os.path import exists
from os import remove
import logging
import numpy
from astropy.io import fits
from configargparse import Action
from autowisp.multiprocessing_util import setup_process
from autowisp.image_calibration.mask_utilities import mask_flags
from autowisp.image_calibration.master_maker import MasterMaker
from autowisp.processing_steps.manual_util import (
ManualStepArgumentParser,
ignore_progress,
)
from autowisp.file_utilities import find_fits_fnames
from autowisp.fits_utilities import get_primary_header
input_type = "calibrated"
_logger = logging.getLogger(__name__)
fail_reasons = {"discarded": -2}
# Interface define by argparse
# pylint: disable=too-few-public-methods
[docs]
class ParseAverageAction(Action):
"""Parse the specification of the averaging function on the command line."""
[docs]
def __call__(self, parser, namespace, values, option_string=None):
"""Set the callable to use for averaging."""
assert len(values) == 1
if ":" not in values[0]:
result = getattr(numpy, "nan" + values[0])
else:
func, level = values[0].split(":")
result = partial(getattr(numpy, "nan" + func), q=float(level))
setattr(namespace, self.dest, result)
# pylint: enable=too-few-public-methods
[docs]
def get_command_line_parser(
*args,
default_threshold=(5.0,),
single_master=True,
default_min_valid_values=5,
default_max_iter=20,
):
"""Return a command line parser with all arguments added."""
parser = ManualStepArgumentParser(
description=__doc__, input_type="" if args else input_type
)
parser.add_argument(
"--average-func",
default="median",
type=lambda f: getattr(numpy, "nan" + f),
help="The function to use for calculating the average of pixel values "
"accross images. For quantile and percentile, the desired level is "
"specified by adding ``:<q>`` at the end of the function name(e.g."
"``quantile:0.7``).",
)
parser.add_argument(
"--outlier-threshold",
type=float,
nargs="+",
default=default_threshold,
help="When averaging the values of a given pixel among the input "
"images, values that are further than this value times the root mean "
"square devitaion from the average are rejected and the average is "
"recomputed iteratively until convergence or maximum number of "
"iterations.",
)
parser.add_argument(
"--min-valid-values",
type=int,
default=default_min_valid_values,
help="If rejecting outlier pixels results in fewer than this many "
"surviving pixels, the corresponding pixel gets a bad pixel mask in the"
"master.",
)
parser.add_argument(
"--max-iter",
type=int,
default=default_max_iter,
help="The maximum number of outlier rejection/averaging iterations to "
"allow. If not converged before then, a warning is issued and the "
"average if the last iteration is used.",
)
parser.add_argument(
"--exclude-mask",
choices=mask_flags.keys(),
nargs="+",
default=MasterMaker.default_exclude_mask,
help="A list of mask flags, any of which result in the corresponding "
"pixels being excluded from the averaging. Any mask flags not specified"
"are ignored, treated as clean. Note that ``'BAD'`` means any kind of"
" problem (e.g. saturated, hot/cold pixel, leaked etc.).",
)
parser.add_argument(
"--compress",
type=float,
default=16,
help="If zero, the final result is not compressed. Otherwise,"
"this is the quantization level used for compressing the image (see "
"`astropy.io.fits` documentation).",
)
parser.add_argument(
"--add-averaged-keywords",
nargs="+",
default=["JD-OBS"],
help="Specify any numeric-valued header keywords that should be "
"generated for the master by averaging the corresponding values from "
"the input frames using the same averaging as pixel values. By default "
"the outlier rejected average JD of the input frames is added.",
)
if single_master:
parser.add_argument(
"--min-valid-frames",
type=int,
default=10,
help="If there are fewer than this number of suitable frames to "
"stack in a given master, that master is not generated.",
)
parser.add_argument(
"--stacked-master-fname",
default="MASTERS/{IMAGE_TYPE}_{CAMSN}_{CLRCHNL}_{OBS-SESN}.fits.fz",
help="Filename for the master to generate if successful. Can "
"involve header substitutions, but should produce the same filename"
" for all input frames. If not, the behavior is undefined.",
)
return parser
[docs]
def parse_command_line(*args):
"""Return the parsed command line arguments."""
return get_command_line_parser(*args).parse_args(*args)
[docs]
def get_master_fname(
image_fname, configuration, fname_key="stacked_master_fname"
):
"""Return the name of the master the given image should contribute to."""
with fits.open(image_fname, "readonly") as first_image:
substitutions = dict(get_primary_header(first_image))
for arg in configuration:
if arg.upper() not in substitutions:
substitutions[arg.upper()] = configuration[arg]
return configuration[fname_key].format_map(substitutions)
[docs]
def stack_to_master(
image_collection, start_status, configuration, mark_start, mark_end
):
"""Stack the given frames to produce a single master frame."""
assert start_status is None
for image_fname in image_collection:
mark_start(image_fname)
master_fname = get_master_fname(image_collection[0], configuration)
success, discarded_frames = MasterMaker(
**{
arg: configuration[arg]
for arg in [
"outlier_threshold",
"average_func",
"min_valid_frames",
"min_valid_values",
"max_iter",
"exclude_mask",
"compress",
"add_averaged_keywords",
]
}
)(image_collection, master_fname)
for image_fname in image_collection:
mark_end(
image_fname,
fail_reasons["discarded"] if image_fname in discarded_frames else 1,
)
if success:
assert exists(master_fname)
header = get_primary_header(master_fname)
return {
"filename": master_fname,
"preference_order": f'JD_OBS - {header["JD-OBS"]}',
}
return None
[docs]
def cleanup_interrupted(interrupted, configuration):
"""Cleanup file system after partially creating stacked image(s)."""
master_fname = get_master_fname(interrupted[0][0], configuration)
_logger.info(
"Cleaning up partially created stack %s (%s)",
repr(master_fname),
repr(interrupted),
)
for image_fname, _ in interrupted:
assert master_fname == get_master_fname(image_fname, configuration)
if exists(master_fname):
remove(master_fname)
return -1
[docs]
def main():
"""Run the step from the command line."""
cmdline_config = parse_command_line()
setup_process(task="main", **cmdline_config)
stack_to_master(
list(find_fits_fnames(cmdline_config["calibrated_images"])),
None,
cmdline_config,
ignore_progress,
ignore_progress,
)
if __name__ == "__main__":
main()