Source code for autowisp.magnitude_fitting.base

"""Magnitude fitting interface."""

import sys
import logging
from multiprocessing import current_process
from traceback import format_exception
from abc import ABC, abstractmethod

from numpy.lib import recfunctions
import numpy

from autowisp import DataReductionFile
from autowisp.evaluator import Evaluator
from autowisp.magnitude_fitting.util import get_magfit_sources


# Could not think of a sensible way to reduce number of attributes
# pylint: disable=too-many-instance-attributes
# Still makes sense es a class.
# pylint: disable=too-few-public-methods
[docs] class MagnitudeFit(ABC): """ A base class for all classes doing magnitude fitting. Takes care of adding fitted magnitudes to data reduction files and updating the database. Attributes: config: See `config` argument to __init__(). logger: A python logging logger for emitting messages on the progress and status of magnitude fitting. _dr_fname: The name of the data reduction file currently undergoing magnitude fitting. _magfit_collector: See `magfit_collector` argument to __init__(). _reference: See `reference` argument to __init__(). _catalogue: See `master_catalogue` argument to __init__(). _source_name_format: See `source_name_format` argument to __init__(). """ # TODO: revive once database design is complete
[docs] def _add_fit_to_db(self, coefficients, **fit_diagnostics): r""" Record the given best fit coefficient and diagnostics in the database. Args: coefficients (iterable of values): The best fit coefficients for the magnitude fitting of the current data reduction file. fit_diagnostics: Any information about the fit that should be recorded in the database. The names of the arguments are assumed to correspond to column names in the magnitude fitting table. Returns: None For now disabled. Code from HATpipe:: def _update_db(self, values, apind, fit_res, start_src_count, final_src_count): if self.database is None : return args=((self._header['STID'], self._header['FNUM'], self._header['CMPOS'], self._header['PROJID'], self._header['SPRID'], apind, self.config.version, start_src_count, final_src_count, (None if fit_res is None else float(fit_res))) + tuple((None if v is None else float(v)) for v in values)) statement=('REPLACE INTO `'+self.config.dest_table+ '` (`station_id`, `fnum`, `cmpos`, `project_id`, ' '`sphotref_id`, `aperture`, `magfit_version`, ' '`input_src`, `non_rej_src`, `rms_residuals`, `'+ '`, `'.join(self._db_columns)+'`) VALUES (%s'+ ', %s'*(len(args)-1)+')') self._log_to_file( 'Inserting into DB:\n' + '\t' + statement + '\n' + '\targs: ' + repr(args) + '\n' + '\targ types: ' + repr([type(v) for v in args]) + '\n' ) self.database(statement, args) """
[docs] @abstractmethod def _fit(self, fit_data): """ Perform a fit for the magfit correction. Args: fit_data (numpy structured array): The current photometry being fitted. It should contain the source information from the frame being fit, the photometry from the reference (and optionally reference position if used) and catalogue information for each source. Returns: [[dict]]: For each photometry (i.e. different aperture of PSF fitting) a list of dictionaries is returned containing a fit result dictionary for each group of sources with the following entries: :coefficients: The best fit coefficients after the rejection iterations of the fit has converged. :residual: The residual of the latest iteration of the fit, :initial_src_count: The number of stars the fit started with before any rejections, :final_src_count: The number of stars left in the fit after the final rejection iteration, :group_id: The ID of the group of sources this fit result corresponds to. """
[docs] @abstractmethod def _apply_fit(self, phot, fit_results): """ Return corrected magnitudes using best fit magfit coefficients. Args: phot: The current photometry being fit, including catalogue information. fit_results: The best fit parameters derived using _fit(). Returns: numpy.array (number sources x number photometry methods): The magnitude fit corrected magnitudes. """
[docs] def _solved( self, *, data_reduction, deleted_phot_indices, num_phot, num_sources, dr_path_substitutions, ): """Return fitted mags if fit is already in DR, None otherwise.""" try: fitted = numpy.empty((num_sources, num_phot), dtype=float) phot_ind = 0 if data_reduction.has_shape_fit( accept_zeropsf=False, **dr_path_substitutions ): fitted[:, 0] = data_reduction.get_dataset( "shapefit.magfit.magnitude", expected_shape=(num_sources,), **dr_path_substitutions, ) phot_ind += 1 while phot_ind < num_phot: fitted[:, phot_ind] = data_reduction.get_dataset( "apphot.magfit.magnitude", expected_shape=(num_sources,), **dr_path_substitutions, ) phot_ind += 1 numpy.delete(fitted, deleted_phot_indices, axis=0) return fitted except (KeyError, IOError): return None
[docs] def _set_group(self, evaluator, result): """Set the fit_group column in result per grouping configuration.""" self.logger.debug("Grouping expression: %s", repr(self.config.grouping)) conditions = evaluator(self.config.grouping) self.logger.debug("Grouping conditions: %s", repr(conditions)) groups = numpy.unique(conditions) self.logger.debug("Groups: %s", repr(groups)) for group_id, group_condition in enumerate(groups): in_group = conditions == group_condition self.logger.debug( "Group %d contains %d entries", group_id, in_group.sum() ) result["fit_group"][in_group] = group_id
[docs] def _get_fit_indices(self, phot, evaluator, no_catalogue): """ Return a list of the indices within phot of sources to use for mag fit. Exclude sources based on their catalague information. Args: phot: See return of DataReductionFile.get_photometry(). evaluator(Evaluator): An object capable of evaluating expressions involving fields from phot. no_catalogue: The list of source indices for which no catalogue information is available (all are rejected). Returns: [int]: A list of the indices within phot of the sources which pass all catalogue requirements for inclusion in the magnitude fit. int: The number of dropped sources. tuple: The ID of one of the sources dropped, None if no sources were dropped. """ include_flag = evaluator(self.config.fit_source_condition) include_flag[no_catalogue] = False result = include_flag.nonzero()[0] num_skipped = len(phot["source_id"]) - result.size if num_skipped: first_skipped = numpy.logical_not(include_flag).nonzero()[0][0] skipped_example = phot["source_id"][first_skipped] else: skipped_example = None return result, num_skipped, skipped_example
[docs] def _match_to_reference(self, phot, no_catalogue, evaluator): """ Add photometric reference information, and filter per self.config. Args: phot: See return of add_catalogue_info() no_catalogue (list): The source indices from phot for which no catalogue information was available, so default values were used. All identified sources are omitted from the result. evaluator (Evaluator): An object capable of evaluating expressions involving photometry and catalogue columns. Returns: a photometry structure like phot but for each aperture 'ref mag' is added - the magnitude the source has in the reference - and sources that should not be used in the fit because they are not in the reference or do not satisfy the criteria based on catalogue quantities are removed. """ def initialize_result(): """Return an empty result structure.""" dtype = [ (field[0], field[1][0]) for field in phot.dtype.fields.items() ] if self.config.reference_subpix: dtype.extend( [("x_ref", numpy.float64), ("y_ref", numpy.float64)] ) dtype.extend( [ ("ref_mag", numpy.float64, phot["mag"][0].shape), ("ref_mag_err", numpy.float64, phot["mag_err"][0].shape), ] ) print("Result dtype: " + repr(dtype)) return numpy.empty(phot.shape, dtype=dtype) result = initialize_result() not_in_ref = [0, None] fit_indices, num_skipped, skipped_example = self._get_fit_indices( phot, evaluator, no_catalogue ) result_ind = 0 for phot_ind in fit_indices: source_id = phot["source_id"][phot_ind] if source_id.shape != (): source_id = tuple(source_id) ref_info = self._reference.get(source_id) if ref_info is None: if not_in_ref[0] == 0: not_in_ref[1] = result["source_id"][result_ind] not_in_ref[0] += 1 continue for colname in phot.dtype.names: result[colname][result_ind] = phot[colname][phot_ind] if self.config.reference_subpix: result["x_ref"][result_ind] = ref_info["x"] result["y_ref"][result_ind] = ref_info["y"] result["ref_mag"][result_ind] = ref_info["mag"] result["ref_mag_err"][result_ind] = ref_info["mag_err"] result_ind += 1 if result_ind == 0: print(repr(not_in_ref[1])) self.logger.error( ( "All %d sources discarded from %s: %d skipped " "(example %s), %d not in the %d sources of the reference " "(example %s.), fit source condition: %s" ), len(phot["source_id"]), self._dr_fname, num_skipped, ( self._source_name_format.format(skipped_example) if skipped_example is not None else "-" ), not_in_ref[0], len(self._reference.keys()), ( self._source_name_format.format(not_in_ref[1]) if not_in_ref[1] is not None else "-" ), self.config.fit_source_condition, ) return result[:result_ind], fit_indices
# TODO: revive once database design is complete
[docs] def _update_calib_status(self): """ Record in the database that the current header has been magfitted. For now disabled. Code from HATpipe:: self.database( 'UPDATE `' + raw_db_table(self._header['IMAGETYP']) + '` SET `calib_status`=%s WHERE `station_id`=%s AND `fnum`=%s ' 'AND `cmpos`=%s', ( self.config.calib_status, self._header['STID'], self._header['FNUM'], self._header['CMPOS'] ) ) """
# TODO: Is this necessary?
[docs] def _downgrade_calib_status(self): r""" Deal with bad photometry for a frame. Decrements the calibration status of the given file to astrometry and deletes the raw photometry file. For now disabled. Code from HATpipe:: sys.stderr.write( 'bad photometry encountered: ' + str(self._header) + '\n' ) if(self.database is None): return self._log_to_file( 'Downgrading status of header: ' + str(self._header) + '\n' ) sys.stderr.write( 'downgrading calibration status of ' + str(self._header) + '\n' ) self.database( 'UPDATE `'+raw_db_table(self._header['IMAGETYP'])+'` SET ' '`calib_status`=%s WHERE `station_id`=%s AND `fnum`=%s AND ' '`cmpos`=%s', ( object_status['good_astrometry'], self._header['STID'], self._header['FNUM'], self._header['CMPOS'] ) ) sys.stderr.write('removing:'+self._fit_file+'\n') os.remove(self._fit_file) """
[docs] @staticmethod def _combine_fit_statistics(fit_results): """ Combine the statistics summarizing how the fit went from all groups. Properly combines values from the individual group fits into single numbers for each photometry method. The quantities processed are: residual, initial_src_count, final_src_count. Args: fit_results: The best fit results for this photometry. Returns: dict: The derived fit statistics. Keys are residual, initial_src_count, and final_src_count, with one entry for each input photometry method. """ num_photometries = len(fit_results) result = { "residual": numpy.empty(num_photometries, numpy.float64), "initial_src_count": numpy.zeros(num_photometries, numpy.int_), "final_src_count": numpy.zeros(num_photometries, numpy.int_), } for phot_ind, phot_result in enumerate(fit_results): for group_result in phot_result: for key in ["initial_src_count", "final_src_count"]: result[key][phot_ind] += group_result[key] result["residual"][phot_ind] = numpy.nanmedian( [ group_result["residual"] or numpy.nan for group_result in phot_result ] ) return result
[docs] def __init__( self, *, reference, # master_catalogue, config, magfit_collector=None, source_name_format, ): """ Initializes a magnditude fitting object. Args: reference(dict): the reference against which fitting is done. Should be indexed by source and contain entries implementing the dict interface with keys 'mag', 'mag_err' and optionally 'x' and 'y' if the sub-pixel position of the source in the reference is to be used in magnitude fitting. #master_catalogue(pandas.DataFrame): should be indexed by source # id and contain relevant catalog information. config: An object with attributes configuring how to perform magnitude fitting. It should provide at least the following attributes: * fit_source_condition: An expression involving catalogue, reference and/or photometry variables which evaluates to zero if a source should be excluded and any non-zero value if it should be included in the magnitude fit. * reference_subpix: Should the magnitude fitting correction depend on the sub-pixel position of the source in the reference frame. * grouping: An expressions using catalogue, and/or photometry variables which evaluates to a tuple of boolean values. Each distinct tuple defines a separate fitting group (i.e. a group of sources which participate in magnitude fitting together, excluding sources belonging to other groups). magfit_collector(MasterPhotrefCollector): Object collecting fitted magnitedes for generating statistics of the scatter after magnitude fitting. """ self.config = config self._dr_fname = None self._magfit_collector = magfit_collector self._reference = reference # self._catalogue = master_catalogue self._source_name_format = source_name_format self.logger = None
[docs] def __call__(self, dr_fname, mark_start, mark_end, **dr_path_substitutions): """ Performs the fit for the latest magfit iteration for a single frame. Args: dr_fname: The name of the data reductio file to fit. dr_path_substitutions: See path_substitutions argument to DataReduction.get_source_data(). Returns: array: The non-rejected photometry for the frame. array: The magfit corrected non-rejected photometry for the frame. """ self.logger = logging.getLogger(__name__) try: self.logger.debug( "Process %d fitting: %s.", current_process().pid, dr_fname ) with DataReductionFile(dr_fname, mode="r+") as data_reduction: self._dr_fname = dr_fname phot = get_magfit_sources( data_reduction, magfit_iterations=[-1], **dr_path_substitutions, ) self.logger.debug("Starting photometry: %s", repr(phot)) self.logger.debug( "Starting photometry columns: %s", repr(phot.dtype.names) ) if not phot.size: self.logger.warning("Downgrading calib status.") self._downgrade_calib_status() return None, None # TODO: revive filtering by catalogue no_catalogue, deleted_phot_indices = [], [] if getattr(self.config, "grouping", None) is not None: phot = recfunctions.append_fields( phot, ["fit_group"], [[]], usemask=False ) self.logger.debug( "Photometry columns: %s", repr(phot.dtype.names) ) evaluator = Evaluator(phot) if self.config.grouping is not None: self._set_group(evaluator, phot) self.logger.debug("Checking for existing solution.") fitted = self._solved( data_reduction=data_reduction, deleted_phot_indices=deleted_phot_indices, num_phot=phot["mag"].shape[2], num_sources=phot["mag"].shape[0], dr_path_substitutions=dr_path_substitutions, ) self.logger.debug("Matching to reference.") fit_base, fit_indices = self._match_to_reference( phot, no_catalogue, evaluator ) if fitted: return phot[fit_indices], fitted[fit_indices] if fit_base.size > 0: self.logger.debug("Performing linear fit.") fit_results = self._fit(fit_base) if fit_results: self.logger.debug("Post-processing fit.") fitted = self._apply_fit(phot, fit_results) assert fitted.shape == ( phot["mag"].shape[0], phot["mag"].shape[2], ) self.logger.debug("Adding to DR file.") mark_start(dr_fname) data_reduction.add_magnitude_fitting( fitted_magnitudes=fitted, fit_statistics=self._combine_fit_statistics( fit_results ), magfit_configuration=self.config, missing_indices=deleted_phot_indices, **dr_path_substitutions, ) mark_end(dr_fname) self.logger.debug("Updating calibration status.") return phot[fit_indices], fitted[fit_indices] return None, None except Exception as ex: # Does not make sense to avoid building message. # pylint: disable=logging-not-lazy self.logger.critical( str(ex) + "\n" + "".join(format_exception(*sys.exc_info())) + "\nBad DR:" + dr_fname ) # pylint: enable=logging-not-lazy raise
# pylint: enable=too-many-instance-attributes # pylint: enable=too-few-public-methods