Source code for autowisp.light_curves.light_curve_file

"""Define a class for working with light curve files."""

import re

import numpy
import h5py

from autowisp.database.hdf5_file_structure import HDF5FileDatabaseStructure
from autowisp.evaluator import Evaluator
from .hashable_array import HashableArray

_config_dset_key_rex = re.compile(
    "|".join(
        [
            r"_cfg_version$",
            r".software_versions$",
            r"\.cfg\.(?!(epoch|fov|orientation))",
            r"^srcextract\.psf_map\.cfg\.",
        ]
    )
)


# Come from H5py.
# pylint: disable=too-many-ancestors
[docs] class LightCurveFile(HDF5FileDatabaseStructure): """ Interface for working with the pipeline generated light curve files. Attributes: _config_indices(dict): A dictionary of the already read-in configuration indices (re-used if requested again). """ @classmethod def _product(cls): return "light_curve"
[docs] @classmethod def _get_root_tag_name(cls): """The name of the root tag in the layout configuration.""" return "LightCurve"
[docs] def _get_hashable_dataset(self, dataset_key, **substitutions): """Return the selected dataset with hashable entries.""" try: values = self.get_dataset(dataset_key, **substitutions) except IOError: return [] if isinstance(values[0], numpy.ndarray): if h5py.check_dtype(vlen=values[0].dtype) is bytes: return [HashableArray(numpy.array(list(v))) for v in values] return [HashableArray(v) for v in values] if values.dtype.kind == "f": return [v if numpy.isfinite(v) else "NaN" for v in values] return values
[docs] def _get_configurations(self, component, quantities, **substitutions): """ Return a the configurations for a given component. Args: component: What to return the configuration of. Should correspond to a configuration index variable (withouth the `.cfg_index` suffix). quantities: A list of the pipeline keys identifying all quantities belonging to this configuration component. Undefined behavior results if `component` and `quantities` are not consistent. substitutions: Substitutions required to fully resolve the paths to the datasets contaning the configurations. Returns: A dictionary indexed by the hash of a configuration with entries 2-tuples of: - the ID assigned to a configuration. - and frozenset of (name, value) pairs containing the configuration. Also stores the extracted list of configurations as self.__configurations[component][set(substitutions.items())] """ def report_indistinct_configurations(config_list): """Report all repeating configurations in an exception.""" message = ( f"Identical {component!s} configurations found in " f"{self.filename}:\n" ) hash_list = [hash(c) for c in config_list] for config in set(config_list): if config_list.count(config) != 1: this_hash = hash(config) message += ( "Indices (" + ", ".join( [ str(i) for i, h in enumerate(hash_list) if this_hash == h ] ) + ") contain: \n" ) for key, value in zip(quantities, config): message += f"\t {key} = {value!r}\n" raise IOError(message) substitution_set = frozenset(substitutions.items()) if ( component in self._configurations and substitution_set in self._configurations[component] ): return self._configurations[component][substitution_set] stored_configurations = list( zip( *[ self._get_hashable_dataset(pipeline_key, **substitutions) for pipeline_key in quantities ] ) ) if len(set(stored_configurations)) != len(stored_configurations): report_indistinct_configurations(stored_configurations) stored_config_sets = [ frozenset(zip(quantities, config)) for config in stored_configurations ] result = { hash(config): (index, config) for index, config in enumerate(stored_config_sets) } if component not in self._configurations: self._configurations[component] = {} self._configurations[component][substitution_set] = result return result
[docs] def __init__(self, *args, source_ids=None, **kwargs): """ Open a lightcurve file. Args: source_ids(None or dict): The known identifiers of this source in catalogues. Must be set if the lightcurve file is being created. If it already exists, identifiers already defined in the lightcurve are checked against supplied values, and new identifiers are added if the file is being opened for writing. args: Passed directly to super().__init__() kwargs: Passed directly to super().__init__() Returns: None """ super().__init__(*args, **kwargs) self._configurations = {} self._config_indices = {} if "Identifiers" not in self and self.driver != "core": if not source_ids: raise ValueError( "Must specify at least one identifier when creating new " "lightcurve file!" ) self.create_dataset( "Identifiers", (0, 2), maxshape=(None, 2), chunks=(10, 2), dtype=h5py.string_dtype(), ) if source_ids is not None: add_source_ids = dict(source_ids) # False positive # pylint: disable=no-member stored_identifiers = dict(self["Identifiers"].asstr()) # pylint: enable=no-member for catalogue, identifier in source_ids.items(): if catalogue in stored_identifiers: assert identifier == stored_identifiers[catalogue] del add_source_ids[catalogue] # False positive # pylint: disable=no-member destination = self["Identifiers"].shape[0] self["Identifiers"].resize((destination + len(add_source_ids), 2)) # pylint: enable=no-member for new_id in add_source_ids.items(): self["Identifiers"][destination] = new_id destination += 1
[docs] def get_config_indices(self, dataset_key, **substitutions): """Return the config index dset for indexing the given config dset.""" substitution_key = frozenset(substitutions.items()) config_component = dataset_key while True: try: result = self._config_indices.get(config_component) if result is not None: result = result.get(substitution_key) if result is None: result = self.get_dataset( config_component + ".cfg_index", **substitutions ) if config_component not in self._config_indices: self._config_indices[config_component] = {} self._config_indices[config_component][ substitution_key ] = result return result except KeyError: config_component = config_component.rsplit(".", 1)[0]
[docs] def read_data(self, dataset_key, **substitutions): """Similar to get_dataset, except config datasets are expanded.""" data = self.get_dataset(dataset_key, **substitutions) if _config_dset_key_rex.search(dataset_key): config_indices = self.get_config_indices( dataset_key, **substitutions ) data = data[config_indices] return data
[docs] def read_data_array(self, variables): """ Return a numpy structured array of the given variables. Args: variables([dict]): The variables to read. Each key is a variable name in the resulting array and the corresponding value is a 2-tuple giving the dataset key to use for that variable, along with any substitutions required to fully resolve the dataset path. Retuns: numpy.array: Array with field names the variables specified on input containing the specified data. Configuration datasets are expanded to lightcurve points using the corresponding configuration index. """ def result_column_dtype(dset_key): """The type to use for the given column in the result.""" result = self.get_dtype(dset_key) if result == numpy.string_: return numpy.dtype("O") return result def create_empty_result(result_size): """Create an uninitialized dasates to hold the result.""" return numpy.empty( result_size, dtype=[ (vname, result_column_dtype(dset_key)) for vname, (dset_key, subs) in variables.items() ], ) result = None for var_name, (dataset_key, substitutions) in variables.items(): data = self.read_data(dataset_key, **substitutions) if result is None: result = create_empty_result(data.size) first_dset = dataset_key, substitutions elif data.shape != result.shape: raise RuntimeError( f"For {self.filename!r}, {dataset_key!r}: {substitutions!r}" f"dataset shape {data.shape!r} does not match the shape of " f"{result.shape!r} of {first_dset[0]!r}: {first_dset[1]!r}" ) result[var_name] = data return result
[docs] def add_configurations( self, component, configurations, config_indices, *, config_index_selection=None, **substitutions, ): """ Add a list of configurations to the LC, merging with existing ones. Also updates the configuration index dataset. Args: component(str): The component for which these configurations apply (i.e. it should have an associated configuration index dataset). configurations(iterable): The configurations to add. Each configuration should be an iterable of 2-tuples formatted like (`pipeline_key`, `value`). config_indices(array of int): For each frame, the corresponding entry is the index within configurations of the configuration that applies for that frame. resolve_size(str): How to deal with confirm LC length differing from actual? See extend_dataset() for details. config_index_selection: Either None, slice or boolean array for the configuration index dataset to set the new indices. If None, the new indices are appended at the end of the configuration index dataset, otherwise, it must selected exactly the same number of elements as are found in config_indices. substitutions: Any substitutions required to fully resolve the paths to the configuration and configuration index datasets. Returns: None """ def get_new_data(): """Return a dict of pipeline_key, data of the updates needed.""" index_dset = numpy.empty(config_indices.shape, dtype=numpy.uint) config_keys = None for config_index, new_config in enumerate(configurations): config_hash = hash(new_config) if config_keys is None: config_keys = [entry[0] for entry in new_config] stored_configurations = self._get_configurations( component, config_keys, **substitutions ) config_data_to_add = {key: [] for key in config_keys} else: assert len(new_config) == len(config_keys) for entry in new_config: # Will be set to sequence before this # pylint: disable=unsupported-membership-test assert entry[0] in config_keys # pylint: enable=unsupported-membership-test if config_hash in stored_configurations: index_dset[config_indices == config_index] = ( stored_configurations[config_hash][0] ) else: index_dset[config_indices == config_index] = len( stored_configurations ) stored_configurations[config_hash] = ( index_dset[config_index], new_config, ) for key, value in new_config: config_data_to_add[key].append( value.unwrap() if isinstance(value, HashableArray) else value ) for key in config_data_to_add: config_data_to_add[key] = numpy.array( config_data_to_add[key], dtype=h5py.check_dtype( vlen=numpy.dtype(self.get_dtype(key)) ), ) config_data_to_add[component + ".cfg_index"] = index_dset return config_data_to_add for pipeline_key, new_data in get_new_data().items(): if config_index_selection is not None and pipeline_key == ( component + ".cfg_index" ): self.add_dataset( dataset_key=pipeline_key, data=None, if_exists="ignore", unlimited=True, shape=new_data.shape, dtype=new_data.dtype, **substitutions, ) self[ self._file_structure[pipeline_key].abspath % substitutions ][config_index_selection] = new_data else: self.extend_dataset( pipeline_key, new_data, resolve_size="actual", is_config=True, **substitutions, )
[docs] def get_lc_length(self, **substitutions): """Return the number of poinst present in this lightcurve.""" dataset_path = ( self._file_structure["skypos.BJD"].abspath % substitutions ) if dataset_path not in self: return 0 return len(self[dataset_path])
[docs] def extend_dataset( self, dataset_key, new_data, resolve_size=None, is_config=False, **substitutions, ): """ Add more points to the dataset identified by dataset_key. If the given dataset does not exist it is created as unlimited in its first dimension, and matching the shape in `new_data` for the other dimensions. Args: dataset_key: The key identifying the dataset to update. new_data: The additional values that should be written, a numpy array with an appropriate data type and shape. resolve_size: Should be either 'actual' or 'confirmed'. Indicating which dataset length to accept when adding new data. If left as `None`, an error is rasied if the confirmed length does not match the actual length of the dataset. substitututions: Any arguments that should be substituted in the dataset path. Returns: None """ def get_pad_value(replace_nonfinite): """If replace_nonfinite is undefined base padding on dtype.""" if replace_nonfinite is not None: return replace_nonfinite dtype_str = self._file_structure[dataset_key].dtype if dtype_str == "numpy.bool_": return False if dtype_str == "numpy.float64": return numpy.nan if dtype_str == "numpy.int32": return numpy.iinfo(numpy.int32).min if dtype_str == "numpy.string_": return "" if dtype_str == "numpy.uint": return numpy.iinfo(numpy.uint).max assert False def add_new_data(dataset, confirmed_length): """Add new_data to the given dataset after confirmed_length.""" dtype = self.get_dataset_creation_args( dataset_key, **substitutions ).get("dtype") if dtype is None: dtype = new_data.dtype else: dtype = numpy.dtype(dtype) self._logger.debug( "Adding new data after confirmed length %s:\n%s", repr(confirmed_length), repr(new_data), ) data_copy = self._replace_nonfinite( new_data, dtype, dataset_config.replace_nonfinite ) new_dataset_size = confirmed_length + len(data_copy) if new_dataset_size < len(dataset): try: all_data = numpy.concatenate( (dataset[:confirmed_length], data_copy) ) except Exception as exc: raise IOError( "Failed to read lightcurve dataset " f"'{self.filename}/{dataset.name}' " f"(actual length of {len(dataset):d}, " f"expected {confirmed_length:d})!" ) from exc self._logger.debug( "Dataset %s length (%s) exceeds confirmed (%d) + new %s. " "Recreating from scratch.", dataset.name, repr(dataset.shape), confirmed_length, repr(data_copy.shape), ) self.add_dataset( dataset_key=dataset_key, data=all_data, unlimited=True, **substitutions, ) else: pad = confirmed_length - len(dataset) dataset.resize(new_dataset_size, 0) if pad > 0: self._logger.debug( "Padding %s (shape %s) from %s to %s with %s", repr(dataset.name), repr(dataset.shape), repr(confirmed_length - pad), repr(confirmed_length), repr(dataset_config.replace_nonfinite), ) dataset[confirmed_length - pad : confirmed_length] = ( get_pad_value(dataset_config.replace_nonfinite) ) dataset[confirmed_length:] = data_copy dataset_config = self._file_structure[dataset_key] dataset_path = dataset_config.abspath % substitutions confirmed_length = self.get_attribute( "confirmed_lc_length", default_value=0 ) actual_length = self.get_lc_length(**substitutions) if confirmed_length > actual_length and resolve_size != "actual": raise IOError( f"The {self.filename} lightcurve has a length of " f"{actual_length}, smaller than the confirmed " f"length of {confirmed_length}." ) if confirmed_length != actual_length: if not resolve_size: raise IOError( f"The lightcurve {self.filename!r} has an actual " f"length of {actual_length:d}, expected " f"{confirmed_length:d}!" ) if resolve_size == "actual": confirmed_length = actual_length elif resolve_size != "confirmed": raise IOError( "Unexpected lightcurve length resolution: " + repr(resolve_size) ) confirmed_length = int(confirmed_length) if dataset_path in self: assert actual_length > 0 dataset = self[dataset_path] add_new_data( dataset, len(dataset) if is_config else confirmed_length ) else: if confirmed_length == 0 or is_config: data_to_add = new_data else: assert confirmed_length > 0 data_to_add = numpy.concatenate( ( numpy.full( confirmed_length, get_pad_value(dataset_config.replace_nonfinite), ), new_data, ) ) self.add_dataset( dataset_key, data_to_add, unlimited=True, **substitutions )
[docs] def confirm_lc_length(self, **substitutions): """Set the confirmed length of the lightcurve to match actual length.""" self.add_attribute( "confirmed_lc_length", self.get_lc_length(**substitutions) )
[docs] def get_num_magfit_iterations( self, photometry_mode, lc_points, **path_substitutions ): """ Return how many magnitude fitting iterations are in the file. Args: path_substitutions: See get_source_count(). Returns: int: The number of magnitude fitting iterations performed on the set of photometry measurements identified by the path_substitutions argument. """ path_substitutions["magfit_iteration"] = 0 dataset_key = photometry_mode + ".magfit.magnitude" path_zero = ( self._file_structure[dataset_key].abspath % path_substitutions ) while True: path_substitutions["magfit_iteration"] += 1 if ( self._file_structure[dataset_key].abspath % path_substitutions == path_zero ): return 0 try: if not numpy.isfinite( self.get_dataset(dataset_key, **path_substitutions)[ lc_points ] ).any(): break except IOError: break return path_substitutions["magfit_iteration"]
[docs] def add_corrected_dataset( self, original_key, corrected_key, corrected_values, corrected_selection, **substitutions, ): """ Add corrected values for a dataset (e.g. after EPD or TFA). Args: original_key(str): The pipeline key identifying the original dataset that was corrected. corrected_key(str): The pipeline key identifying the dataset where the corrected values should be stored. corrected_values: The resulting values after the correction has been applied. corrected_selection: Some sort of slice on the dataset that identifies the points which were corrected. substitutions: Any arguments that need to be substituted in the paths of the original and corrected datasets to get a unique entry. Returns: None """ self._logger.debug( "Adding to %s corrected version of %d points of %s: %s", repr(self.filename), corrected_selection.sum(), repr(self._file_structure[original_key].abspath % substitutions), repr(self._file_structure[corrected_key].abspath % substitutions), ) original_dset = self[ self._file_structure[original_key].abspath % substitutions ] self.add_dataset( dataset_key=corrected_key, data=None, if_exists="ignore", unlimited=True, shape=original_dset.shape, dtype=original_dset.dtype, **substitutions, ) destination_config = self._file_structure[corrected_key] self[destination_config.abspath % substitutions][ corrected_selection ] = self._replace_nonfinite( corrected_values, self.get_dataset_creation_args(corrected_key, **substitutions).get( "dtype" ), destination_config.replace_nonfinite, )
[docs] def evaluate_expression(self, variables, expression): """Return the values of the given expression at each LC point.""" return Evaluator(self.read_data_array(dict(variables)))(expression)
# pylint: enable=too-many-ancestors