Source code for autowisp.processing_steps.fit_magnitudes

#!/usr/bin/env python3

"""Apply magnitude fitting to hdf5 files"""

from types import SimpleNamespace
from itertools import count
import os
import logging

from sqlalchemy import func, select

from autowisp.multiprocessing_util import setup_process
from autowisp import magnitude_fitting, DataReductionFile
from autowisp.file_utilities import find_dr_fnames
from autowisp.catalog import ensure_catalog
from autowisp.processing_steps.manual_util import (
    ManualStepArgumentParser,
    ignore_progress,
    get_catalog_config,
)
from autowisp.database.interface import Session

# False positive due to unusual importing
# pylint: disable=no-name-in-module
from autowisp.database.data_model import MasterFile

# pylint: enable=no-name-in-module

input_type = "dr"
_logger = logging.getLogger(__name__)


[docs] def parse_command_line(*args): """Return the parsed command line arguments.""" parser = ManualStepArgumentParser( description=__doc__, input_type=("" if args else input_type), inputs_help_extra=( "The corresponding DR files must alread contain all " "photometric measurements." ), add_catalog={"prefix": "magfit"}, add_component_versions=( "skytoframe", "srcproj", "background", "shapefit", "apphot", "magfit", ), add_photref=True, allow_parallel_processing=True, ) parser.add_argument( "--magfit-only-if", default="True", help="Expression involving the header of the input images that " "evaluates to True/False if a particular image from the specified " "image collection should/should not be processed.", ) parser.add_argument( "--master-photref-fname", default=None, help="The name of a master photometric reference to use. If specified, " "the sintgle reference is ignored and magnitude fitting proceeds " "without any iterations.", ) parser.add_argument( "--master-photref-fname-format", default=( "MASTERS/mphotref_" "{FIELD}_{CLRCHNL}_{EXPTIME}sec_iter{magfit_iteration:03d}.fits" ), help="A format string involving a {magfit_iteration} substitution along" " with any variables from the header of the single photometric " "reference or passed through the path_substitutions arguments, that " "expands to the name of the file to save the master photometric " "reference for a particular iteration.", ) parser.add_argument( "--magfit-stat-fname-format", default=( "MASTERS/mfit_stat_{FIELD}_{CLRCHNL}_{EXPTIME}sec_iter" "{magfit_iteration:03d}.txt" ), help="Similar to ``master_photref_fname_format``, but defines the name" " to use for saving the statistics of a magnitude fitting iteration.", ) parser.add_argument( "--correction-parametrization", type=str, default="O3{phot_g_mean_mag, x, y}", help="A string that expands to the terms to include in the magnitude " "fitting correction.", ) parser.add_argument( "--mphotref-scatter-fit-terms", default="O2{phot_g_mean_mag}", help="Terms to include in the fit for the scatter when deciding which " "stars to include in the master.", ) parser.add_argument( "--reference-subpix", action="store_true", default=False, help="Should the magnitude fitting correction depend on the " "sub-pixel position of the source in the reference frame" "Default: %(default)s", ) parser.add_argument( "--fit-source-condition", type=str, default="isfinite(phot_g_mean_mag)", help="An expression involving catalog, reference and/or photometry " "variables which evaluates to zero if a source should be excluded and " "any non-zero value if it should be included in the magnitude fit.", ) parser.add_argument( "--grouping", default=None, help=( "An expression using catalog, and/or photometry variables which " "evaluates to several distinct values. Each value " "defines a separate fitting group (i.e. a group of sources which " "participate in magnitude fitting together, excluding sources " "belonging to other groups). Default: %(default)s" ), ) parser.add_argument( "--error-avg", default="weightedmean", help="How to average fitting residuals for outlier rejection.", ) parser.add_argument( "--rej-level", type=float, default=5.0, help="How far away from the fit should a point be before " "it is rejected in units of error_avg. Default: %(default)s", ) parser.add_argument( "--max-rej-iter", type=int, default=20, help="The maximum number of rejection/re-fitting iterations to perform." " If the fit has not converged by then, the latest iteration is " "accepted.", ) parser.add_argument( "--noise-offset", type=float, default=0.01, help="Additional offset to format magnitude error estimates when they " "are used to determine the fitting weights. ", ) parser.add_argument( "--max-mag-err", type=float, default=0.1, help="The largest the formal magnitude error is allowed " "to be before the source is excluded. Default: %(default)s", ) parser.add_argument( "--max-photref-change", type=float, default=1e-4, help="The maximum square average change of photometric reference " "magnitudes to consider the iterations converged.", ) parser.add_argument( "--max-magfit-iterations", type=int, default=5, help="The maximum number of iterations of deriving a master photometric" " referene and re-fitting to allow.", ) return parser.parse_args(*args)
[docs] def get_path_substitutions(configuration, sphotref_header): """Return the path substitutions to find magfit datasets.""" result = { what + "_version": configuration[what + "_version"] for what in ["shapefit", "srcproj", "apphot", "background", "magfit"] } if configuration["master_photref_fname"] is not None: for iteration in count(): if ( configuration["master_photref_fname_format"].format( **sphotref_header, **result, magfit_iteration=iteration ) == configuration["master_photref_fname"] ): result["magfit_iteration"] = iteration break if iteration >= configuration["max_magfit_iterations"]: raise ValueError( "Master photometric reference " f"{configuration['master_photref_fname']!r} does not appear" " to follow the specified filename format: " f"{configuration['master_photref_fname_format']!r}!" ) return result
[docs] def get_dr_fnames_and_catalog( dr_collection, configuration, sphotref_header, mark_start, mark_end ): """Return the DR files to process and the catalog.""" dr_fnames = sorted(dr_collection) catalog_sources, outliers, catalog_fname = ensure_catalog( dr_files=dr_fnames, configuration=get_catalog_config(configuration, "magfit"), return_metadata=False, skytoframe_version=configuration["skytoframe_version"], header=sphotref_header, ) for outlier_ind in reversed(outliers): outlier_dr = dr_fnames.pop(outlier_ind) _logger.warning( "Data reduction file %s has outlier pointing. Discarding!", outlier_dr, ) mark_start(outlier_dr) mark_end(outlier_dr, -2, final=True) return dr_fnames, catalog_sources, catalog_fname
[docs] def fit_magnitudes( dr_collection, start_status, configuration, mark_start, mark_end ): """Perform magnitude fitting for the given DR files.""" if start_status is None: start_status = -1 else: assert start_status % 2 == 1 with DataReductionFile( configuration["single_photref_dr_fname"], "r" ) as sphotref_dr_file: sphotref_header = sphotref_dr_file.get_frame_header() dr_fnames, catalog_sources, catalog_fname = get_dr_fnames_and_catalog( dr_collection, configuration, sphotref_header, mark_start, mark_end ) kwargs = { "fit_dr_filenames": dr_fnames, "configuration": SimpleNamespace( **configuration, continue_from_iteration=(start_status + 1) // 2, source_name_format="{0:d}", ), "mark_start": mark_start, "mark_end": mark_end, "path_substitutions": get_path_substitutions( configuration, sphotref_header ), } if configuration["master_photref_fname"] is not None: _logger.info( "Using existing master photometric reference: %s with\n\t%s", configuration["master_photref_fname"], "\n\t".join(f"{k}: {v!r}" for k, v in kwargs.items()), ) assert start_status == -1 magnitude_fitting.single_iteration( photref=magnitude_fitting.get_master_photref( configuration["master_photref_fname"] ), **kwargs, ) return None _logger.info( "Starting iterative magfit for single photref: %s with\n\t%s", configuration["single_photref_dr_fname"], "\n\t".join(f"{k}: {v!r}" for k, v in kwargs.items()), ) master_photref_fname, magfit_stat_fname = magnitude_fitting.iterative_refit( single_photref_dr_fname=configuration["single_photref_dr_fname"], catalog_sources=catalog_sources, **kwargs, ) return [ { "filename": master_photref_fname, "preference_order": None, "type": "master_photref", }, { "filename": magfit_stat_fname, "preference_order": None, "type": "magfit_stat", }, { "filename": catalog_fname, "preference_order": None, "type": "magfit_catalog", }, ]
[docs] def delete_master(filename, master_type): """Delete the master from file system and database.""" # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member assert ( # False positive # pylint: disable=not-callable db_session.scalar( select(func.count()) .select_from(MasterFile) .filter_by(filename=filename) ) == 0 # pylint: enable=not-callable ) if os.path.exists(filename): _logger.warning( "Removing potentially partial %s file '%s'!", master_type, filename, ) os.remove(filename)
[docs] def check_no_master(filename, master_type): """Raise an exception if the given master exists.""" if os.path.exists(filename): raise RuntimeError( f"{master_type} file {filename!r} should not exist!" "Cleaning up interrupted magnitude fitting failed!" )
[docs] def clean_dr(dr_fname, dr_substitutions): """Remove a magfit iteration from the given DR file.""" _logger.warning( "Deleting magfit iteration %d from '%s'!", dr_substitutions["magfit_iteration"], dr_fname, ) with DataReductionFile(dr_fname, "r+") as dr_file: dr_file.delete_dataset("shapefit.magfit.magnitude", **dr_substitutions) for aperture_index in count(): if not dr_file.check_for_dataset( "apphot.magnitude", aperture_index=aperture_index, **dr_substitutions, ): break dr_file.delete_dataset( "apphot.magfit.magnitude", aperture_index=aperture_index, **dr_substitutions, )
[docs] def cleanup_interrupted(interrupted, configuration): """Remove DR entries stats and magfit references for magfit_iteration.""" min_status = min(interrupted, key=lambda x: x[1])[1] max_status = max(interrupted, key=lambda x: x[1])[1] _logger.info( "Cleaning up interrupted magfit with status %d to %d", min_status, max_status, ) if min_status < max_status - 1 - max_status % 2: raise ValueError( "Encountered internally inconsistent interrupted status values when" " cleaning up magnitude fitting " f"(min: {min_status}, max: {max_status})!" ) with DataReductionFile( configuration["single_photref_dr_fname"], "r+" ) as single_photref_dr: sphotref_header = single_photref_dr.get_frame_header() fname_substitutions = dict(sphotref_header) dr_substitutions = get_path_substitutions( configuration, sphotref_header ) fname_substitutions.update(dr_substitutions) dr_substitutions["magfit_iteration"] = max_status // 2 for dr_fname, _ in interrupted: clean_dr(dr_fname, dr_substitutions) if configuration["master_photref_fname"] is None: for master_type in ["master_photref", "magfit_stat"]: fname_substitutions["magfit_iteration"] = max_status // 2 delete_master( configuration[master_type + "_fname_format"].format_map( fname_substitutions ), master_type, ) for iteration in range(0, max_status // 2): fname_substitutions["magfit_iteration"] = iteration check_fname = configuration[ master_type + "_fname_format" ].format_map(fname_substitutions) _logger.debug("Checking existence of %s", repr(check_fname)) assert os.path.exists(check_fname) fname_substitutions["magfit_iteration"] = max_status // 2 + 1 check_no_master( configuration[master_type + "_fname_format"].format_map( fname_substitutions ), master_type, ) return max_status - 1 - max_status % 2
[docs] def has_apphot(dr_fname, substitutions): """Check if the DR file contains a sky-to-frame transformation.""" with DataReductionFile(dr_fname, mode="r") as dr_file: try: dr_file.check_for_dataset("apphot.magnitude", **substitutions) return True except IOError: return False
[docs] def main(): """Run the step from command line.""" configuration = parse_command_line() setup_process(task="manage", **configuration) with DataReductionFile( configuration["single_photref_dr_fname"], "r+" ) as single_photref_dr: dr_substitutions = get_path_substitutions( configuration, single_photref_dr.get_frame_header() ) dr_substitutions["aperture_index"] = ( single_photref_dr.get_num_apertures(**dr_substitutions) - 1 ) fit_magnitudes( [ dr_fname for dr_fname in find_dr_fnames( configuration.pop("dr_files"), configuration.pop("magfit_only_if"), ) if has_apphot(dr_fname, dr_substitutions) ], None, configuration, ignore_progress, ignore_progress, )
if __name__ == "__main__": main()