"""Define a class for worknig with data reduction files."""
# pylint: disable=too-many-lines
import string
from functools import partial
import logging
import numpy
import h5py
import pandas
from autowisp.multiprocessing_util import setup_process_map
from autowisp.hat.file_parsers import parse_anmatch_transformation
from autowisp.miscellaneous import RECOGNIZED_HAT_ID_PREFIXES
from autowisp.database.hdf5_file_structure import HDF5FileDatabaseStructure
git_id = "$Id: cd2a6c9ea1521813bb0f625ac3878ee3291e7f99 $"
# TODO: Add missed attributes: bg.cfg.annulus, bg.cfg.zero.
[docs]
def init_dr_process(configuration):
"""Initialize the process with the given configuration."""
DataReductionFile.fname_template = configuration["data_reduction_fname"]
setup_process_map(configuration)
# The class has to satisfy many needs, hence many public methods.
# pylint: disable=too-many-public-methods
# Out of my control (most ancestors come from h5py module).
# pylint: disable=too-many-ancestors
[docs]
class DataReductionFile(HDF5FileDatabaseStructure):
"""
Interface for working with the pipeline data reduction (DR) files.
Attributes:
_product(str): The pipeline key of the HDF5 product. In this case:
`'data_reduction'`
_key_io_tree_to_dr (dict): A dictionary specifying the correspondence
between the keys used in astrowisp.IOTree to store quantities and
the element key in the DR file.
_dtype_dr_to_io_tree (dict): A dictionary specifying the
correspondence between data types for entries in DR files and data
types in astrowisp.IOTree.
"""
_logger = logging.getLogger(__name__)
fname_template = None
[docs]
@classmethod
def _product(cls):
return "data_reduction"
[docs]
@classmethod
def _get_root_tag_name(cls):
"""The name of the root tag in the layout configuration."""
return "DataReduction"
[docs]
def _prepare_source_iter(
self, dataset_key, column_substitution_name, **path_substitutions
):
"""
Return required head and tail of paths identifying source collection.
Args:
See `get_sources()`.
Returns:
str: The path to the parent group containing all source columns.
str: The string that must be in the beginning of each path for it
to be considered part of the source collection.
str: The string that must be in the end of each path for it to be
considered part of the source collection.
"""
path_substitutions[column_substitution_name] = "{column}"
self._logger.debug(
"Parsing source path: %s",
repr(self._file_structure[dataset_key].abspath),
)
parsed_path = string.Formatter().parse(
self._file_structure[dataset_key].abspath % path_substitutions
)
pre_column, verify, _, _ = next(parsed_path)
self._logger.debug("Pre_column: %s, verify: %s", pre_column, verify)
assert verify == "column"
try:
name_tail = next(parsed_path)
for i in range(1, 4):
assert name_tail[i] is None
name_tail = name_tail[0]
try:
next(parsed_path)
assert False
except StopIteration:
pass
except StopIteration:
name_tail = ""
parent, name_head = pre_column.rsplit("/", 1)
return parent, name_head, name_tail
# pylint: enable=no-member
[docs]
def get_dataset_creation_args(self, dataset_key, **path_substitutions):
"""See HDF5File.get_dataset_creation_args(), but handle srcextract."""
result = super().get_dataset_creation_args(
dataset_key, **path_substitutions
)
if dataset_key == "srcextract.sources":
column = path_substitutions["srcextract_column_name"]
if column.lower() in ["id", "numberpixels", "npix", "nsatpix"]:
result["compression"] = "gzip"
result["compression_opts"] = 9
else:
del result["compression"]
result["scaleoffset"] = 3
return result
[docs]
def add_sources(
self,
data,
dataset_key,
column_substitution_name,
*,
parse_ids=False,
ascii_columns=(),
**path_substitutions,
):
"""
Creates datasets out of the fields in an array of sources.
Args:
data(structured numpy.array): The data about the sources to
add.
dataset_key(str): The pipeline key for the dataset to add.
column_substitution_name(str): The %-subsittution variable to
distinguish between the column in the array.
parse_ids(bool): Should self.parse_hat_source_id() be used to
translate string IDs to datasets to insert?
string_columns([str]): A list of column names to convert to ascii
strings before saving.
Returns:
None
"""
def iter_data():
"""Iterate over (column name, values) of the input data."""
if hasattr(data, "dtype"):
for column_name in data.dtype.names:
yield column_name, data[column_name]
else:
yield data.index.name, data.index.array
for column_name, series in data.items():
yield column_name, series.array
for column_name, column_data in iter_data():
if column_name in ascii_columns or column_data.dtype.kind in "SUO":
column_data = column_data.astype("string_")
if parse_ids and column_name == "ID":
id_data = self.parse_hat_source_id(column_data)
for id_part in ["prefix", "field", "source"]:
self.add_dataset(
dataset_key=dataset_key,
data=id_data[id_part],
**{column_substitution_name: "hat_id_" + id_part},
**path_substitutions,
)
else:
self._logger.debug(
"Saving %s dataset of type: %s",
repr(column_name),
repr(column_data.dtype),
)
self.add_dataset(
dataset_key=dataset_key,
data=column_data,
**{column_substitution_name: column_name.replace("/", "")},
**path_substitutions,
)
[docs]
def delete_sources(
self, dataset_key, column_substitution_name, **path_substitutions
):
"""Delete all columns of a given source collection."""
parent, name_head, name_tail = self._prepare_source_iter(
dataset_key, column_substitution_name, **path_substitutions
)
if parent not in self:
return
to_delete = []
self[parent].visit(to_delete.append)
for dset_name in to_delete:
self.delete_columns(self[parent], name_head, name_tail, dset_name)
[docs]
def get_sources(
self, dataset_key, column_substitution_name, **path_substitutions
):
"""
Return a collection of sources previously stored in the DR file.
Args:
dataset_key(str): The pipeline key for the dataset to return.
column_substitution_name(str): The %-subsittution variable to
distinguish between the column in the array.
Returns:
dict:
The keys are the columns of the sources stored and the values
are 1-D numpy arrays containing the data.
"""
parent, name_head, name_tail = self._prepare_source_iter(
dataset_key, column_substitution_name, **path_substitutions
)
result = pandas.DataFrame()
self._logger.debug(
"Collecting columns frcom %s under %s, starting with %s and ending "
"with %s",
self.filename,
parent,
name_head,
name_tail,
)
self[parent].visititems(
partial(self.collect_columns, result, name_head, name_tail)
)
column_names = [colname.lower() for colname in result.columns]
for id_colname in ["id", "source_id"]:
if id_colname in column_names:
result.set_index(id_colname, inplace=True)
return result
[docs]
def __init__(self, *args, **kwargs):
"""Open or create a data reduction file.
Args:
See HDF5File.__init__() for description of arguments, however
instead of fname, a DataReductionFile can be specified by the header
of the frame it corresponds to (or at least a dict-like object
defining the header keywords required by the DR filename template).
"""
if "header" in kwargs:
kwargs["fname"] = self.get_fname_from_header(kwargs["header"])
del kwargs["header"]
super().__init__(*args, **kwargs)
self._hat_id_prefixes = numpy.array(
RECOGNIZED_HAT_ID_PREFIXES,
dtype=self.get_dtype("srcproj.recognized_hat_id_prefixes"),
)
[docs]
def get_dtype(self, element_key):
"""Return numpy data type for the element with by the given key."""
if element_key.endswith(".hat_id_prefix"):
return h5py.special_dtype(
enum=(
numpy.ubyte,
dict(
(prefix, value)
for value, prefix in enumerate(self._hat_id_prefixes)
),
)
)
result = super().get_dtype(element_key)
return result
[docs]
def parse_hat_source_id(self, source_id):
"""Return the prefix ID, field number, and source number."""
if hasattr(source_id, "dtype") and source_id.shape:
id_data = {
id_part: numpy.empty((len(source_id),), dtype=id_dtype)
for id_part, id_dtype in [
("prefix", self.get_dtype(".hat_id_prefix")),
("field", numpy.uint16),
("source", numpy.uint32),
]
}
for source_index, this_id in enumerate(source_id):
(
id_data["prefix"][source_index],
id_data["field"][source_index],
id_data["source"][source_index],
) = self.parse_hat_source_id(this_id)
return id_data
if isinstance(source_id, bytes):
c_style_end = source_id.find(b"\0")
if c_style_end >= 0:
source_id = source_id[:c_style_end].decode()
else:
source_id = source_id.decode()
prefix_str, field_str, source_str = source_id.split("-")
return (
numpy.where(self._hat_id_prefixes == prefix_str.encode("ascii"))[0][
0
],
int(field_str),
int(source_str),
)
[docs]
def get_source_count(self, **path_substitutions):
"""
Return the number of sources for the given tool versions.
Args:
path_substitutions: Values to substitute in the paths to the
datasets and attributes containing shape fit informaiton
(usually versions of various components).
Returns:
int:
The number of projected sources in the databasets reached by the
given substitutions.
"""
path_substitutions["srcproj_column_name"] = "hat_id_prefix"
return self[
self._file_structure["srcproj.columns"].abspath % path_substitutions
].len()
[docs]
def get_num_apertures(self, **path_substitutions):
"""Return the number of apertures used for aperture photometry."""
num_apertures = 0
while True:
try:
self.check_for_dataset(
"apphot.magnitude",
aperture_index=num_apertures,
**path_substitutions,
)
num_apertures += 1
except IOError:
return num_apertures
assert False
[docs]
def get_num_magfit_iterations(self, **path_substitutions):
"""
Return how many magnitude fitting iterations are in the file.
Args:
path_substitutions: See get_source_count().
Returns:
int:
The number of magnitude fitting iterations performed on the
set of photometry measurements identified by the
path_substitutions argument.
"""
path_substitutions["aperture_index"] = 0
path_substitutions["magfit_iteration"] = 0
for photometry_mode in ["shapefit", "apphot"]:
try:
self.check_for_dataset(
photometry_mode + ".magfit.magnitude", **path_substitutions
)
except IOError:
continue
while True:
path_substitutions["magfit_iteration"] += 1
try:
self.check_for_dataset(
photometry_mode + ".magfit.magnitude",
**path_substitutions,
)
except IOError:
break
return path_substitutions["magfit_iteration"]
[docs]
def has_shape_fit(self, accept_zeropsf=True, **path_substitutions):
"""True iff shape fitting photometry exists for path_substitutions."""
try:
self.check_for_dataset("shapefit.magnitude", **path_substitutions)
return (
accept_zeropsf
or min(
self.get_attribute(
"shapefit.cfg.psf.bicubic.grid.x", **path_substitutions
).size,
self.get_attribute(
"shapefit.cfg.psf.bicubic.grid.y", **path_substitutions
).size,
)
> 2
)
except IOError:
return False
# Could not think of a reasonable way to simplify further.
# pylint: disable=too-many-locals
# pylint: disable=too-many-statements
[docs]
def get_source_data(
self,
*,
magfit_iterations="all",
shape_fit=True,
apphot=True,
string_source_ids=True,
all_numeric_source_ids=False,
background=True,
**path_substitutions,
):
"""
Extract available photometry from the data reduction file.
Args:
magfit_iterations(iterable): The set of magnitude fitting
iterations to include in the result. ``0`` is the raw photometry
(i.e. no magnitude fitting), 1 is single reference frame fit, 1
is the first re-fit etc. Use ``'all'`` to get all iterations.
Negative numbers have the same interpretation as python list
indices. For example ``-1`` is the final iteration.
shape_fit(bool): Should the result include shape fit photometry
measurements. If ``True`` and no shape fit is present, still
excludes shape fit columns.
apphot(bool): Should the result include aperture photometry
measurements.
string_source_ids(bool): Should source IDs be formatted as
strings (True) or a set of integers (False)?
background(bool): Should the result include information about the
background behind the sources?
path_substitutions: See get_source_count().
Returns:
pandas.Dataframe:
The photometry information in the current data reduction file.
The columns always included are:
* ID(set as index): an array of sources IDs in the given DR
file. Either a string (if string_source_ids) or 1- or
3-column composite index depending on ID type.
* <catalogue quantity> (dtype as needed): one entry for each
catalogue column.
* x (numpy.float64): The x coordinates of the sources
* y (numpy.float64): The y coordinates of the sources
The following columns are included if the corresponding input
argument is set to True:
* bg (numpy.float64): The background estimates for the
sources
* bg_err (numpy.float64): Error estimate for 'bg'
* bg_npix (numpy.uint): The number of pixel background
extraction was based on.
* mag (2-D numpy.float64 array): measured magnitudes. The
first dimension is the index within the
``magfit_iterations`` argument and the second index
iterates over photometry, starting with shape fitting (if
the ``shape_fit`` argument is True),
followed by the aperture photometry measurement for each
aperture (if the ``apphot`` argument is True).
* mag_err (numpy.float64): Error estimate for ``mag``. Same
shape and order.
* phot_flag: The quality flag for the photometry. Same
shape and order as ``mag``.
"""
def assemble_hat_id(prefix, field, source):
return f"{prefix.decode()}-{field:03d}-{source:07d}".encode("ascii")
def initialize_result():
"""Create the part of the result always included."""
result = self.get_sources(
"srcproj.columns", "srcproj_column_name", **path_substitutions
)
self._logger.debug(
"Initial source data columns: %s", repr(result.columns)
)
hat_id_components = [
"hat_id_prefix",
"hat_id_field",
"hat_id_source",
]
if string_source_ids:
if result.index.name == "source_id":
result["ID"] = numpy.vectorize(
lambda i: str(i).encode("ascii"), otypes=["O"]
)(result.index)
else:
result["ID"] = numpy.vectorize(assemble_hat_id)(
*[result[comp] for comp in hat_id_components],
otypes=["O"],
)
for id_component in hat_id_components:
del result[id_component]
result.set_index("ID", inplace=True)
elif set(hat_id_components) < set(result.columns):
if all_numeric_source_ids:
result.insert(
0, "hat_id_prefnum", len(self._hat_id_prefixes)
)
for new_id, old_id in enumerate(self._hat_id_prefixes):
result.loc[
result["hat_id_prefix"] == old_id, "hat_id_prefnum"
] = new_id
assert result["hat_id_prefnum"].max() < len(
self._hat_id_prefixes
)
hat_id_components[0] = "hat_id_prefnum"
result.set_index(hat_id_components, inplace=True)
self._logger.debug(
"Source data after formatting ID:\n%s", repr(result)
)
return result
def normalize_magfit_iterations():
"""Make sure ``magfit_iterations`` is a list of positive indices."""
if magfit_iterations != "all" and (
len(magfit_iterations) == 0 or min(magfit_iterations) >= 0
):
return magfit_iterations
all_magfit_indices = numpy.array(
[0]
+ list(
range(
1,
self.get_num_magfit_iterations(**path_substitutions)
+ 1,
)
)
)
if magfit_iterations == "all":
return all_magfit_indices
return all_magfit_indices[magfit_iterations]
def fill_background(result):
"""Fill the background entries in the result."""
for result_key, dataset_key in (
("bg", "bg.value"),
("bg_err", "bg.error"),
("bg_npix", "bg.npix"),
):
result[result_key] = self.get_dataset(
dataset_key,
expected_shape=result.shape,
**path_substitutions,
)
def fill_photometry(result):
"""Fill the photomtric measurements entries in result."""
for result_key, dataset_key_tail in (
("mag", "magnitude"),
("mag_err", "magnitude_error"),
("phot_flag", "quality_flag"),
):
for magfit_iter in magfit_iterations:
if magfit_iter == 0 or result_key != "mag":
dataset_key_middle = ""
else:
dataset_key_middle = "magfit."
path_substitutions["magfit_iteration"] = magfit_iter - 1
column_tail = f"_mfit{magfit_iter:03d}"
if shape_fit:
result["shapefit_" + result_key + column_tail] = (
self.get_dataset(
(
"shapefit."
+ dataset_key_middle
+ dataset_key_tail
),
expected_shape=result.shape,
**path_substitutions,
)
)
if apphot:
num_apertures = self.get_num_apertures(
**path_substitutions
)
for aperture_index in range(num_apertures):
result[
f"ap{aperture_index:03d}_"
+ result_key
+ column_tail
] = self.get_dataset(
(
"apphot."
+ dataset_key_middle
+ dataset_key_tail
),
expected_shape=result.shape,
aperture_index=aperture_index,
**path_substitutions,
)
shape_fit = shape_fit and self.has_shape_fit(**path_substitutions)
magfit_iterations = normalize_magfit_iterations()
result = initialize_result()
if background:
fill_background(result)
fill_photometry(result)
return result
[docs]
def get_source_ids(self, string_source_ids=True, **path_substitutions):
"""Return the IDs of the sources in the given DR file.
Args:
string_source_ids: Should source IDs be formatted as strings
(True) or a set of integers (False)?
path_substitutions: See get_source_count().
Returns:
numpy.array:
See ID field of result in get_source_data().
"""
return self.get_source_data(
string_source_ids=string_source_ids,
magfit_iterations=[],
shape_fit=False,
apphot=False,
shape_map_variables=False,
background=False,
position=False,
**path_substitutions,
).index
[docs]
def add_magnitude_fitting(
self,
*,
fitted_magnitudes,
fit_statistics,
magfit_configuration,
missing_indices,
**path_substitutions,
):
"""
Add a magnitude fitting iteration to the DR file.
Args:
fitted_magnitudes(numpy.array): The differential photometry
corrected magnitudes of the sources.
fit_statistics(dict): Summary statistics about how the fit went.
It should define at least the following keys:
``initial_src_count``, ``final_src_count``, and ``residual``.
magfit_configuration: The configuration structure with which
magnitude fitting was performed.
missing_indices: A list of indices within the file of sources
for which no entries are included in fitted_magnitudes.
Returns:
None
"""
def pad_missing_magnitudes():
"""Return fitted magnitudes with nans added at missing_indices."""
if not missing_indices:
return fitted_magnitudes
fitted_magnitudes_shape = list(fitted_magnitudes.shape)
fitted_magnitudes_shape[0] += len(missing_indices)
padded_fitted_magnitudes = numpy.empty(
shape=fitted_magnitudes_shape, dtype=fitted_magnitudes.dtype
)
padded_fitted_magnitudes[missing_indices] = numpy.nan
padded_fitted_magnitudes[
[
ind not in missing_indices
for ind in range(fitted_magnitudes_shape[0])
]
] = fitted_magnitudes
return padded_fitted_magnitudes
def add_magfit_datasets(fitted_magnitudes, include_shape_fit):
"""Create the datasets holding the newly fitted magnitudes."""
def add_dataset(dset_key, dset_data, substitutions):
orig_path = self.add_dataset(
dset_key,
dset_data,
if_exists="error",
**substitutions,
)
path_template = self._file_structure[dset_key].abspath
for magfit_iter in range(
num_magfit_iterations,
path_substitutions["magfit_iteration"],
):
self[
path_template
% {
**substitutions,
"magfit_iteration": magfit_iter,
}
] = self[orig_path]
num_apertures = fitted_magnitudes.shape[1]
apphot_start = 0
if include_shape_fit:
num_apertures -= 1
apphot_start = 1
add_dataset(
"shapefit.magfit.magnitude",
fitted_magnitudes[:, 0],
path_substitutions,
)
for aperture_index in range(num_apertures):
add_dataset(
"apphot.magfit.magnitude",
fitted_magnitudes[:, aperture_index + apphot_start],
{**path_substitutions, "aperture_index": aperture_index},
)
def add_attributes(include_shape_fit):
"""Add attributes with the magfit configuration."""
for phot_index in range(fitted_magnitudes.shape[1]):
phot_method = (
"shapefit"
if include_shape_fit and phot_index == 0
else "apphot"
)
if phot_method == "apphot":
path_substitutions["aperture_index"] = (
path_substitutions.get("aperture_index", -1) + 1
)
if num_magfit_iterations == 0:
self.add_attribute(
phot_method + ".magfit.cfg.correction_type",
b"linear",
if_exists="error",
**path_substitutions,
)
for pipeline_key_end, config_attribute in [
("correction", "correction_parametrization"),
("require", "fit_source_condition"),
("single_photref", "single_photref_dr_fname"),
]:
self.add_attribute(
phot_method + ".magfit.cfg." + pipeline_key_end,
getattr(magfit_configuration, config_attribute),
if_exists="error",
**path_substitutions,
)
for config_param in [
"noise_offset",
"max_mag_err",
"rej_level",
"max_rej_iter",
"error_avg",
]:
self.add_attribute(
phot_method + ".magfit.cfg." + config_param,
getattr(magfit_configuration, config_param),
if_exists="error",
**path_substitutions,
)
for pipeline_key_end, statistics_key in [
("num_input_src", "initial_src_count"),
("num_fit_src", "final_src_count"),
("fit_residual", "residual"),
]:
self.add_attribute(
phot_method + ".magfit." + pipeline_key_end,
fit_statistics[statistics_key][phot_index],
if_exists="error",
**path_substitutions,
)
num_magfit_iterations = self.get_num_magfit_iterations(
**path_substitutions
)
if "magfit_iteration" in path_substitutions:
assert (
path_substitutions["magfit_iteration"] >= num_magfit_iterations
)
else:
path_substitutions["magfit_iteration"] = num_magfit_iterations
self._logger.debug(
"Adding magfit iteration %d to %s containing %d prior iterations",
path_substitutions["magfit_iteration"],
self.filename,
num_magfit_iterations,
)
include_shape_fit = self.has_shape_fit(
accept_zeropsf=False, **path_substitutions
)
add_magfit_datasets(pad_missing_magnitudes(), include_shape_fit)
add_attributes(include_shape_fit)
[docs]
def add_hat_astrometry(
self, filenames, configuration, **path_substitutions
):
"""
Add astrometry derived by fistar, and anmatch to the DR file.
Args:
filanemes(dict): The files containing the astrometry results.
Should have the following keys: `'fistar'`, `'trans'`,
`'match'`, `'catalogue'`.
configuration: An object with attributes containing the
configuraiton of how astormetry was performed.
path_substitutions: See get_source_count()
Returns:
None
"""
def add_match(extracted_sources, catalogue_sources):
"""Create dset of the matched indices from catalogue & extracted."""
num_cat_columns = len(catalogue_sources.dtype.names)
match_ids = numpy.genfromtxt(
filenames["match"],
dtype=None,
names=["cat_id", "extracted_id"],
usecols=(0, num_cat_columns),
)
extracted_sorter = numpy.argsort(extracted_sources["ID"])
catalogue_sorter = numpy.argsort(catalogue_sources["ID"])
match = numpy.empty([match_ids.size, 2], dtype=int)
match[:, 0] = catalogue_sorter[
numpy.searchsorted(
catalogue_sources["ID"],
match_ids["cat_id"],
sorter=catalogue_sorter,
)
]
match[:, 1] = extracted_sorter[
numpy.searchsorted(
extracted_sources["ID"],
match_ids["extracted_id"],
sorter=extracted_sorter,
)
]
self.add_dataset(
dataset_key="skytoframe.matched",
data=match,
**path_substitutions,
)
def add_trans():
"""Create dsets/attrs describing the sky to frame transformation."""
transformation, info = parse_anmatch_transformation(
filenames["trans"]
)
self.add_dataset(
dataset_key="skytoframe.coefficients",
data=numpy.stack(
(transformation["dxfit"], transformation["dyfit"])
),
**path_substitutions,
)
for entry in ["type", "order", "offset", "scale"]:
self.add_attribute(
attribute_key="skytoframe." + entry,
attribute_value=transformation[entry],
**path_substitutions,
)
for entry in ["residual", "unitarity"]:
self.add_attribute(
attribute_key="skytoframe." + entry,
attribute_value=info[entry],
**path_substitutions,
)
self.add_attribute(
attribute_key="skytoframe.sky_center",
attribute_value=numpy.array(
[info["2mass"]["RA"], info["2mass"]["DEC"]]
),
**path_substitutions,
)
def add_configuration():
"""Add the information about the configuration used."""
for component, config_attribute in [
("srcextract", "binning"),
("catalogue", "name"),
("catalogue", "epoch"),
("catalogue", "filter"),
("catalogue", "fov"),
("catalogue", "orientation"),
("skytoframe", "srcextract_filter"),
("skytoframe", "sky_preprojection"),
("skytoframe", "max_match_distance"),
("skytoframe", "frame_center"),
("skytoframe", "weights_expression"),
]:
if component == "catalogue":
value = getattr(
configuration, "astrom_catalogue_" + config_attribute
)
else:
value = getattr(
configuration, component + "_" + config_attribute
)
self.add_attribute(
component + ".cfg." + config_attribute,
value,
**path_substitutions,
)
extracted_sources = numpy.genfromtxt(
filenames["fistar"],
names=[
"ID",
"x",
"y",
"Background",
"Amplitude",
"S",
"D",
"K",
"FWHM",
"Ellipticity",
"PositionAngle",
"Flux",
"SignalToNoise",
"NumberPixels",
],
dtype=None,
)
catalogue_sources = numpy.genfromtxt(
filenames["catalogue"], dtype=None, names=True, deletechars=""
)
catalogue_sources.dtype.names = [
name.split("[", 1)[0] for name in catalogue_sources.dtype.names
]
self.add_sources(
extracted_sources,
"srcextract.sources",
"srcextract_column_name",
**path_substitutions,
)
self.add_sources(
catalogue_sources,
"catalogue.columns",
"catalogue_column_name",
parse_ids=True,
**path_substitutions,
)
add_match(extracted_sources, catalogue_sources)
add_trans()
add_configuration()
[docs]
def get_matched_sources(self, **path_substitutions):
"""Get combined catalogue and extracted matched sources."""
match = self.get_dataset(
dataset_key="skytoframe.matched", **path_substitutions
)
catalogue = (
self.get_sources(
"catalogue.columns",
"catalogue_column_name",
**path_substitutions,
)
.iloc[match[:, 0]]
.reset_index()
)
extracted_sources = (
self.get_sources(
"srcextract.sources",
"srcextract_column_name",
**path_substitutions,
)
.iloc[match[:, 1]]
.reset_index()
)
return pandas.concat([catalogue, extracted_sources], axis=1)
# pylint: enable=too-many-locals
# pylint: enable=too-many-statements
# pylint: enable=too-many-ancestors
# pylint: enable=too-many-public-methods