Source code for superphot_pipeline.iterative_rejection_util

"""A collection of general purpose statistical manipulations of scipy arrays."""

import scipy
import scipy.linalg

from superphot_pipeline.pipeline_exceptions import ConvergenceError

git_id = '$Id: 268e4abdc3ca6320776c791952863d516176e633 $'

#Too many arguments indeed, but most would never be needed.
#Breaking up into smaller pieces will decrease readability
#pylint: disable=too-many-arguments
#pylint: disable=too-many-locals
[docs]def iterative_rejection_average(array, outlier_threshold, average_func=scipy.nanmedian, max_iter=scipy.inf, axis=0, require_convergence=False, mangle_input=False, keepdims=False): """ Avarage with iterative rejection of outliers along an axis. Notes: A more efficient implementation is possible for median. Args: array: The array to compute the average of. outlier_threshold: Outliers are defined as outlier_threshold * (root maen square deviation around the average). Non-finite values are always outliers. This value could also be a 2-tuple with one positive and one negative entry, specifying the thresholds in the positive and negative directions separately. average_func: A function which returns the average to compute (e.g. scipy.nanmean or scipy.nanmedian), must ignore nan values. max_iter: The maximum number of rejection - re-fitting iterations to perform. axis: The axis along which to compute the average. require_convergence: If the maximum number of iterations is reached and still there are entries that should be rejected this argument determines what happens. If True, an exception is raised, if False, the last result is returned as final. mangle_input: Is this function allowed to mangle the input array. keepdims: See the keepdims argument of scipy.mean Returns: average: An array with all axes of a other than axis being the same and the dimension along the axis-th axis being 1. Each entry if of average is independently computed from all other entries. stdev: An empirical estimate of the standard deviation around the returned `average` for each pixel. Calculated as RMS of the difference between individual values and the average divided by one less than the number of pixels contributing to that particular pixel's average. Has the same shape as `average`. num_averaged: The number of non-rejected non-NaN values included in the average of each pixel. Same shape as `average`. """ working_array = (array if mangle_input else scipy.copy(array)) if isinstance(outlier_threshold, (float, int)): threshold_plus = outlier_threshold threshold_minus = -outlier_threshold else: assert len(outlier_threshold) == 2 assert outlier_threshold[0] * outlier_threshold[1] < 0 if outlier_threshold[0] > 0: threshold_plus, threshold_minus = outlier_threshold else: threshold_minus, threshold_plus = outlier_threshold iteration = 0 found_outliers = True while found_outliers and iteration < max_iter: average = average_func(working_array, axis=axis, keepdims=True) difference = working_array - average rms = scipy.sqrt( scipy.mean( scipy.square(difference), axis=axis, keepdims=True ) ) outliers = scipy.logical_or(difference < threshold_minus * rms, difference > threshold_plus * rms) found_outliers = scipy.any(outliers) if found_outliers: working_array[outliers] = scipy.nan if found_outliers and require_convergence: raise ConvergenceError( 'Computing ' + average_func.__name__ + ' with iterative rejection did not converge after ' + str(iteration) + ' iterations!' ) num_averaged = scipy.sum(scipy.logical_not(scipy.isnan(working_array)), axis=axis, keepdims=keepdims) stdev = ( scipy.sqrt( scipy.nanmean(scipy.square(working_array - average), axis=axis, keepdims=keepdims) / (num_averaged - 1) ) ) if not keepdims: average = scipy.squeeze(average, axis) return average, stdev, num_averaged
#pylint: enable=too-many-arguments #pylint: enable=too-many-locals
[docs]def iterative_rej_linear_leastsq(matrix, rhs, outlier_threshold, max_iterations=scipy.inf, return_predicted=False): """ Perform linear leasts squares fit iteratively rejecting outliers. The returned function finds vector x that minimizes the square difference between matrix.dot(x) and rhs, iterating between fitting and rejecting RHS entries which are too far from the fit. Args: matrix: The matrix defining the linear least squares problem. rhs: The RHS of the least squares problem. outlier_threshold: The RHS entries are considered outliers if they devite from the fit by more than this values times the root mean square of the fit residuals. max_iterations: The maximum number of rejection/re-fitting iterations allowed. Zero for simple fit with no rejections. return_predicted: Should the best-fit values for the RHS be returned? Returns: solution: The best fit coefficients. residual: The root mean square residual of the latest fit iteration. predicted: The predicted values for the RHS. Only available if `return_predicted==True`. """ num_surviving = rhs.size iteration = 0 fit_rhs = scipy.copy(rhs) fit_matrix = scipy.copy(matrix) while True: fit_coef, residual = scipy.linalg.lstsq(fit_matrix, fit_rhs)[:2] residual /= num_surviving if iteration == max_iterations: break outliers = (scipy.square(fit_rhs - fit_matrix.dot(fit_coef)) > outlier_threshold**2 * residual) num_surviving -= outliers.sum() fit_rhs[outliers] = 0 fit_matrix[outliers, :] = 0 if not outliers.any(): break iteration += 1 if return_predicted: return fit_coef, scipy.sqrt(residual), matrix.dot(fit_coef) return fit_coef, scipy.sqrt(residual)
#x and y are perfectly readable arguments for a fitting function. #pylint: disable=invalid-name
[docs]def iterative_rej_polynomial_fit(x, y, order, *leastsq_args, **leastsq_kwargs): """ Fit for c_i in y = sum(c_i * x^i), iteratively rejecting outliers. Args: x: The x (independent variable) in the polynomial. y: The value predicted by the polynomial (y). order: The maximum power of x term to include in the polynomial expansion. leastsq_args: Passed directly to iterative_rej_linear_leastsq(). leastsq_kwargs: Passed directly to iterative_rej_linear_leastsq(). Returns: solution: See iterative_rej_linear_leastsq() residual: See iterative_rej_linear_leastsq() """ matrix = scipy.empty((x.size, order+1)) matrix[:, 0] = 1.0 for column in range(1, order + 1): matrix[:, column] = matrix[:, column - 1] * x return iterative_rej_linear_leastsq(matrix, y, *leastsq_args, **leastsq_kwargs)
#pylint: enable=invalid-name