Source code for autowisp.processing_steps.fit_source_extracted_psf_map
#!/usr/bin/env python3
"""Fit a smooth dependence of source extracted PSF parameters."""
import logging
from collections import namedtuple
import numpy
from autowisp.multiprocessing_util import setup_process
from autowisp.file_utilities import find_dr_fnames
from autowisp import DataReductionFile
from autowisp.fit_expression import (
Interface as FitTermsInterface,
iterative_fit,
)
from autowisp.evaluator import Evaluator
from autowisp.processing_steps.manual_util import (
ManualStepArgumentParser,
ignore_progress,
)
_logger = logging.getLogger(__name__)
input_type = "dr"
[docs]
def parse_command_line(*args):
"""Return the parsed command line arguments."""
parser = ManualStepArgumentParser(
description=__doc__,
input_type=("" if args else input_type),
inputs_help_extra="The DR files must already contain astrometry.",
add_component_versions=("srcextract", "catalogue", "skytoframe"),
)
parser.add_argument(
"--srcextract-only-if",
default="True",
help="Expression involving the header of the input images that "
"evaluates to True/False if a particular image from the specified "
"image collection should/should not be processed.",
)
parser.add_argument(
"--srcextract-psf-params",
nargs="+",
default=None,
help="List of the parameters describing PSF shapes of the extracted "
"sources to fit a smooth dependence for. If left unspecified, smooth "
"map will be fit for (`'S'`, `'D'`, `'K'`) or (`'fwhm'`, `'round'`, "
"`'pa'`), whichever is available.",
)
parser.add_argument(
"--srcextract-psfmap-terms",
default="O3{x, y}*O1{phot_g_mean_mag}",
help="An expression involving source extraction and/or catalogue "
"variables for the terms to include in the smoothing fit.",
)
parser.add_argument(
"--srcextract-psfmap-weights",
default=None,
type=str,
help="An expression involving source extraction and/or catalogue "
"variables for the weights to use for the smoothing fit.",
)
parser.add_argument(
"--srcextract-psfmap-error-avg",
default="median",
help="How to average fitting residuals for outlier rejection. ",
)
parser.add_argument(
"--srcextract-psfmap-rej-level",
type=float,
default=5.0,
help="How far away from the fit should a point be before it is rejected"
" in units of error_avg.",
)
parser.add_argument(
"--srcextract-psfmap-max-rej-iter",
type=int,
default=20,
help="The maximum number of rejection/re-fitting iterations to perform."
"If the fit has not converged by then, the latest iteration is "
"accepted. Default: %(default)s",
)
return parser.parse_args(*args)
[docs]
def get_predictors_and_weights(
matched_sources, fit_terms_expression, weights_expression
):
"""Return the matrix of predictors to use for fitting."""
_logger.debug("Matched columns: %s", repr(matched_sources.columns))
if weights_expression is None:
return (FitTermsInterface(fit_terms_expression)(matched_sources), None)
# TODO fix matched_sources to records in Evaluator not here
return (
FitTermsInterface(fit_terms_expression)(matched_sources),
Evaluator(matched_sources.to_records(index=False))(weights_expression),
)
[docs]
def get_psf_param(matched_sources, psf_parameters):
"""Return a numpy structured array of the PSF parameters."""
result = numpy.empty(
len(matched_sources),
dtype=[(param, numpy.float64) for param in psf_parameters],
)
for param in psf_parameters:
_logger.debug("Setting PSF param %s", repr(param))
_logger.debug("Matched columns: %s", repr(matched_sources.columns))
result[param] = matched_sources[param]
return result
[docs]
def detect_psf_parameters(matched_sources):
"""Return the default PSF parameters to fit for he given DR file."""
for try_psf_params in [
("S", "D", "K"),
("s", "d", "k"),
("fwhm", "round", "pa"),
]:
found_all = True
for param in try_psf_params:
if param not in matched_sources.columns:
found_all = False
if found_all:
return try_psf_params
return None
[docs]
def smooth_srcextract_psf(
dr_fname, configuration, mark_start, mark_end, **path_substitutions
):
"""
Fit PSF parameters as polynomials of srcextract and catalogue info.
Args:
dr_fname(str): The name of the DR file to smooth the source extracted
parameters for.
configuration(NamedTuple): Configuration on how to do the smoothing.
Should include the following attributes:
psf_parameters([str]): A list of the variables from the source
extracted datasets to smooth. If None, an attempt is made to
auto detect the parameters.
fit_terms_expression(str): A fitting terms expression defining
the terms to include in the fit.
weights_expression(str): An expression involving source
extraction and/or catalogue variables for the weights to use for
the smoothing fit.
error_avg: See iterative_fit().
rej_level: See iterative_fit().
max_rej_iter: See iterative_fit().
mark_start(callable): Invoked with the name of the DR file before the
contents of the file are updated.
mark_end(callable): Invoked with the name of the DR file after
successfully completing all updates to the DR file.
path_substitutions: Any substitutions required to resolve the
path to extracted sources, catalogue sources and the
destinationdatasets and attributes created by this method.
Returns:
None
"""
with DataReductionFile(dr_fname, "r+") as dr_file:
matched_sources = dr_file.get_matched_sources(**path_substitutions)
predictors, weights = get_predictors_and_weights(
matched_sources,
configuration.fit_terms_expression,
configuration.weights_expression,
)
_logger.debug(
"Predictors (%sx%d)}: %s", *predictors.shape, repr(predictors)
)
if weights is not None:
_logger.debug(
"Weights %s: %s", format(weights.shape), repr(weights)
)
else:
_logger.debug("Not using weights")
fit_results = {"coefficients": {}, "fit_res2": {}, "num_fit_src": {}}
psf_parameters = configuration.psf_parameters
if psf_parameters is None:
psf_parameters = detect_psf_parameters(matched_sources)
if psf_parameters is None:
raise IOError(
f"Matched sources in {dr_file.filename} do not contain a full "
"set of either fistar or hatphot PSF parameters."
)
psf_param = get_psf_param(matched_sources, psf_parameters)
for param_name in psf_parameters:
(
fit_results["coefficients"][param_name],
fit_results["fit_res2"][param_name],
fit_results["num_fit_src"][param_name],
) = iterative_fit(
predictors,
psf_param[param_name],
weights=weights,
error_avg=configuration.error_avg,
rej_level=configuration.rej_level,
max_rej_iter=configuration.max_rej_iter,
fit_identifier=f"Extracted sources PSF {param_name:s} map",
)
if fit_results["coefficients"][param_name] is None:
mark_start(dr_fname)
mark_end(dr_fname, -2)
return
mark_start(dr_fname)
dr_file.save_source_extracted_psf_map(
fit_results=fit_results,
fit_configuration=configuration,
**path_substitutions,
)
mark_end(dr_fname)
[docs]
def get_dr_substitutions(configuration):
"""Return the path substitutions needed to resolve input datasets."""
return {
what + "_version": configuration[what + "_version"]
for what in ["srcextract", "catalogue", "skytoframe"]
}
[docs]
def fit_source_extracted_psf_map(
dr_collection, start_status, configuration, mark_start, mark_end
):
"""Fit a smooth dependence of source extraction PSF for a DR collection."""
assert start_status is None
# This defines a type not variable
# pylint: disable=invalid-name
SmoothingConfigType = namedtuple(
"SmoothingConfigType",
[
"psf_parameters",
"fit_terms_expression",
"weights_expression",
"error_avg",
"rej_level",
"max_rej_iter",
],
)
# pylint: enable=invalid-name
smoothing_config = SmoothingConfigType(
psf_parameters=configuration["srcextract_psf_params"],
fit_terms_expression=configuration["srcextract_psfmap_terms"],
weights_expression=configuration["srcextract_psfmap_weights"],
**{
fit_config: configuration["srcextract_psfmap_" + fit_config]
for fit_config in ["error_avg", "rej_level", "max_rej_iter"]
},
)
for dr_fname in dr_collection:
smooth_srcextract_psf(
dr_fname=dr_fname,
configuration=smoothing_config,
mark_start=mark_start,
mark_end=mark_end,
**get_dr_substitutions(configuration),
)
[docs]
def cleanup_interrupted(interrupted, configuration):
"""Remove the source extracted PSF map from the given DR file."""
path_substitutions = get_dr_substitutions(configuration)
for dr_fname, status in interrupted:
assert status == 0
with DataReductionFile(dr_fname, "r+") as dr_file:
dr_file.delete_dataset("srcextract.psf_map", **path_substitutions)
return -1
[docs]
def has_astrometry(dr_fname, substitutions):
"""Check if the DR file contains a sky-to-frame transformation."""
with DataReductionFile(dr_fname, mode="r") as dr_file:
try:
dr_file.check_for_dataset(
"skytoframe.coefficients", **substitutions
)
return True
except IOError:
return False
[docs]
def main():
"""Run the step from the command line."""
configuration = parse_command_line()
setup_process(task="main", **configuration)
dr_substitutions = get_dr_substitutions(configuration)
fit_source_extracted_psf_map(
[
dr_fname
for dr_fname in find_dr_fnames(
configuration.pop("dr_files"),
configuration.pop("srcextract_only_if"),
)
if has_astrometry(dr_fname, dr_substitutions)
],
None,
configuration,
ignore_progress,
ignore_progress,
)
if __name__ == "__main__":
main()