Source code for autowisp.fit_expression.iterative_fit

"""Define a function doing iterative re-fitting of terms from an expression."""

import logging
from traceback import format_exc

import numpy
import scipy.linalg


# pylint: disable=too-many-locals
[docs] def iterative_fit_qr( weighted_predictors, weighted_qrp, weighted_target, *, weights=None, max_downdates=20, error_avg, rej_level, max_rej_iter, fit_identifier, pre_reject=False, ): """ Same as iterative_fit() but using the QR decomposition of predictors. Args: weighted_predictors(2-D array): The matrix of predictors, with weights already applied (used for calculating residuals and if too many poinst get rejected, see `max_downdates`). weighted_qrp(2-D array): The QR decomposition of `weighted_predictors` with pivoting. Should be exactly the output of `scipy.linalg.qr(weighted_predictors, pivoting=True)`, possibly with `mode='economic'`. weighted_target(1-D array): The vector of values to reproduce as a linear combination of predictors. Should also already be weighted. max_downdates: See iterative_fit(). weights: See iterative_fit(). Only used for outlier detection. error_avg: See iterative_fit(). rej_level: See iterative_fit(). max_rej_iter: See iterative_fit(). fit_identifier: See iterative_fit(). Returns: See iterative_fit_qr() """ logger = logging.getLogger(__name__) def rejected_indices(weighted_fit_diff, weights): """Return indices of outlier sources and squared fit residual.""" logger.debug("Weigthed difference: %s", repr(weighted_fit_diff)) logger.debug("Weigths: %s", repr(weights)) assert weights is None or (weights > 0).all() assert numpy.isfinite(weighted_fit_diff).all() fit_diff2 = pow( weighted_fit_diff / (1.0 if weights is None else weights), 2 ) logger.debug("Square difference: %s", repr(fit_diff2)) if error_avg == "weightedmean": res2 = numpy.mean(pow(weighted_fit_diff, 2)) if weights is not None: res2 /= numpy.mean(pow(weights, 2)) else: res2 = getattr(numpy, error_avg)(fit_diff2) max_diff2 = rej_level**2 * res2 logger.debug("max square difference: %s", repr(max_diff2)) if res2 < 0: logger.debug( "%s", "\n".join( [ repr(fit_identifier), "\tNegative square residual: " + repr(res2), ( "\tWeights (min, max): (" + repr(1.0 if weights is None else min(weights)) + ", " + repr(1.0 if weights is None else max(weights)) + ")" ), ( "\tweighted_fit_diff (min, max): (" + repr(min(weighted_fit_diff)) + ", " + repr(max(weighted_fit_diff)) + ")" ), ] ), ) return (fit_diff2 > max_diff2).nonzero()[0], res2 num_free_coef = len(weighted_predictors) if 0.0 <= max_downdates < 1: max_downdates = int(numpy.around(max_downdates * weighted_target.size)) bad_ind = numpy.logical_not(numpy.isfinite(weighted_target)) if weights is not None: bad_ind = numpy.logical_or(bad_ind, weights <= 0) bad_ind = bad_ind.nonzero()[0] permutation = numpy.argsort(weighted_qrp[2]) logger.debug("Original QRP = %s", repr(weighted_qrp)) for rej_iter in range(-1 if pre_reject else 0, max_rej_iter + 1): weighted_target = numpy.delete(weighted_target, bad_ind) if len(weighted_target) < num_free_coef: logger.critical( "Iteration %d has too few sources: %d to fit for %d " "coefficients.\n", rej_iter, len(weighted_target), num_free_coef, ) return None, None, 0 weighted_predictors = numpy.delete(weighted_predictors, bad_ind, 1) if weights is not None: weights = numpy.delete(weights, bad_ind) logger.debug( "Iteration %d, %d sources, %d coefficients\n", rej_iter, len(weighted_target), num_free_coef, ) if bad_ind.size > max_downdates: logger.debug("Rederiving QR-Decomposition") # False positive # pylint: disable=unexpected-keyword-arg weighted_qrp = scipy.linalg.qr( weighted_predictors.T, mode="economic", pivoting=True ) permutation = numpy.argsort(weighted_qrp[2]) # pylint: enable=unexpected-keyword-arg else: logger.debug("Downdating QR-Decomposition") for i in numpy.flip(bad_ind): # False positive # pylint: disable=no-member weighted_qrp = ( *scipy.linalg.qr_delete(*weighted_qrp[:2], i), weighted_qrp[2], ) # pylint: enable=no-member logger.debug( "After deleting row %d, QRP = %s", i, repr(weighted_qrp) ) if rej_iter < 0: best_fit_coef = numpy.zeros(num_free_coef) else: try: # False positive # pylint: disable=no-member best_fit_coef = scipy.linalg.solve_triangular( weighted_qrp[1], numpy.dot(weighted_qrp[0].T, weighted_target), )[permutation] # pylint: enable=no-member except scipy.linalg.LinAlgError: logger.critical( "Linear least squares fit failed:\n%s", format_exc() ) return None, None, 0 bad_ind, fit_res2 = rejected_indices( numpy.dot(best_fit_coef, weighted_predictors) - weighted_target, weights, ) logger.debug( "Fit: coef = %s, square residual = %s, %d rejected", repr(best_fit_coef), repr(fit_res2), bad_ind.size, ) if bad_ind.size == 0 and rej_iter >= 0: return best_fit_coef, fit_res2, len(weighted_target) return best_fit_coef, fit_res2, len(weighted_target)
# pylint: enable=too-many-locals
[docs] def iterative_fit( predictors, target_values, *, max_downdates=20, weights=None, error_avg, rej_level, max_rej_iter, fit_identifier, pre_reject=False, ): """ Find least squares coefficients reproducing target_values using predictors. Args: predictors: The derivatives w.r.t. to the fit coefficients of the predicted values (i.e. the matrix defining the fitting problem, apart from weighting). target_values: The values wey are trying to reproduce. max_downdates(float or int): The maximum number of deletions to handle by downdating the QR decomposition, either as a fraction of the size of `weighted_target` (if `max_downdates` is a floating point between 0 and 1), or as an absolute value (if `max_downdates` is a positive integer). If an operation results in a larger number of deletions, the original matrix is re-constructed, the corresponding rows are deleted and a new QR decomposition is derived from scratch. weights: The weight to give to each entry in `target_values`. If None, no weighting is done. phot_ind: The index of the photometry being fit (only used for reporting errors). error_avg(str): How to average fitting residuals for outlier rejection. Should be a scipy/numpy top-level function (e.g. mean, nanmedian etc). rej_level(float): How far away from the fit should a point be before it is rejected in units of error_avg. max_rej_iter: The maximum number of rejection/re-fitting iterations to perform. If the fit has not converged by then, the latest iteration is accepted. fit_identifier: Identifier of what is being fit. Only used in logging messages. pre_reject: Should a rejection iteration be performed before even the first fit is attempted (i.e. discard outliers even from the first fit). Returns: numpy.array: The best fit coefficients. float: The average square residual of the best fit. int: The number of non-rejected points used in the last fit iteration. """ if weights is not None: predictors = numpy.multiply(predictors, weights) target_values = numpy.multiply(target_values, weights) logging.getLogger(__name__).debug( "Performing QR decomposition for predictors of shape: %s", repr(predictors.shape), ) # False positive # pylint: disable=unexpected-keyword-arg qrp = scipy.linalg.qr(predictors.T, mode="economic", pivoting=True) # pylint: enable=unexpected-keyword-arg return iterative_fit_qr( predictors, qrp, target_values, weights=weights, max_downdates=max_downdates, error_avg=error_avg, rej_level=rej_level, max_rej_iter=max_rej_iter, fit_identifier=fit_identifier, pre_reject=pre_reject, )