Source code for autowisp.source_finder

"""Uniform interface for source extractoin using several methods."""

import numpy
from astropy.io import fits
from scipy import spatial
from numpy.lib import recfunctions

from astrowisp.utils.file_utilities import (
    prepare_file_output,
    get_unpacked_fits,
)
from autowisp import source_finder_util
from autowisp.evaluator import Evaluator


# This still makes sense as a class
# pylint: disable=too-few-public-methods
[docs] class SourceFinder: """Find sources in an image of the night sky and repor properties."""
[docs] @staticmethod def _create_mock_source_list(fits_fname, configuration): """Create a source list with randomly generated properties.""" with fits.open(fits_fname) as fits_file: # False positive # pylint: disable=no-member hdu_index = 0 if fits_file[0].header["NAXIS"] else 1 xresolution = fits_file[hdu_index].header["NAXIS1"] yresolution = fits_file[hdu_index].header["NAXIS2"] # pylint: disable=no-member med_pixel = numpy.median(fits_file[hdu_index].data) nsources = 1000 result = numpy.empty( nsources, dtype=[ ( name, (numpy.int32 if name in ["id", "npix"] else numpy.float64), ) for name in source_finder_util.get_srcextract_columns("fistar") ], ) result["id"] = numpy.arange(nsources) # False positive # pylint: disable=no-member result["x"] = numpy.random.random(nsources) * xresolution result["y"] = numpy.random.random(nsources) * yresolution result["bg"] = (1.0 + 0.1 * numpy.random.random(nsources)) * med_pixel result["flux"] = ( numpy.random.random(nsources) * configuration["brightness_threshold"] ) result["amp"] = 0.2 * result["flux"] result["s/n"] = result["flux"] / result["bg"] result["s"] = 2.3 + 0.2 * numpy.random.random(nsources) result["d"] = 0.3 + 0.1 * numpy.random.random(nsources) result["k"] = 0.3 + 0.1 * numpy.random.random(nsources) result["npix"] = 20 + (5 * numpy.random.random(nsources)).astype(int) # pylint: enable=no-member return result
[docs] def _add_saturation_flags(self, fits_fname, source_list): """Add array of how many pixels near source are saturated.""" with fits.open(fits_fname, "readonly") as fits_file: saturated_kdtree = spatial.KDTree( numpy.column_stack(numpy.nonzero(fits_file[2].data)) + 0.5 ) source_coords = numpy.column_stack((source_list["y"], source_list["x"])) print( "Match radii: " + repr(numpy.sqrt(source_list["npix"] / numpy.pi)) ) saturated_in_range = saturated_kdtree.query_ball_point( source_coords, numpy.sqrt(source_list["npix"] / numpy.pi) ) print(f"Saturated in range shape: {saturated_in_range.shape}") print(f"Saturated in range: {saturated_in_range!r}") return recfunctions.append_fields( source_list, "nsatpix", [len(saturated) for saturated in saturated_in_range], usemask=False, )
[docs] def __init__( self, *, tool="hatphot", brightness_threshold=10, filter_sources="True", max_sources=0, allow_overwrite=False, allow_dir_creation=False, always_return_sources=False, ): """Prepare to use the specified tool and define faint limit.""" self.configuration = { "tool": tool, "brightness_threshold": brightness_threshold, "allow_overwrite": allow_overwrite, "allow_dir_creation": allow_dir_creation, "always_return_sources": always_return_sources, "filter_sources": filter_sources, "max_sources": max_sources, }
[docs] def __call__(self, fits_fname, source_fname=None, **configuration): """ Extract the sources from the given frame and save or return them. Args: fits_fname(str): The filename of the fits file to extract sources from. Can be packed or unpacked. source_fname(str or None): If None, the extract sources are returned as numpy. field array. Otherwise, this specifies a file to save the source extractor output. Returns: field array or None: The extracted source from the image, if source_fname was None. """ configuration = {**self.configuration, **configuration} if configuration["tool"] == "mock": return self._create_mock_source_list(fits_fname, configuration) start_extraction = getattr( source_finder_util, "start_" + configuration["tool"] ) with get_unpacked_fits(fits_fname) as unpacked_fname: extraction_args = ( unpacked_fname, configuration["brightness_threshold"], ) if source_fname: prepare_file_output( source_fname, configuration["allow_overwrite"], configuration["allow_dir_creation"], ) with open(source_fname, "wb") as destination: start_extraction(*extraction_args, destination).wait() if not configuration["always_return_sources"]: return None sources_file = source_fname else: extraction_process = start_extraction(*extraction_args) sources_file = extraction_process.stdout result = numpy.genfromtxt( sources_file, names=source_finder_util.get_srcextract_columns( configuration["tool"] ), dtype=None, deletechars="", ) if configuration["tool"] == "hatphot": result["x"] -= 0.5 result["y"] -= 0.5 if configuration["filter_sources"] != "True": result = result[ Evaluator(result)(configuration["filter_sources"]) ] result.sort(order="flux") result = numpy.flip(result) if configuration["max_sources"] > 0: result = result[: configuration["max_sources"]] if not source_fname: extraction_process.communicate() return self._add_saturation_flags(unpacked_fname, result)
# pylint: enable=too-few-public-methods