Source code for autowisp.astrometry.transformation
#!/usr/bin/env python3
"""Define class to apply sky-to-frame transformation stored in DR files."""
from functools import partial
import numpy
from numpy.lib.recfunctions import unstructured_to_structured
from scipy.optimize import root
from configargparse import ArgumentParser, DefaultsFormatter
from autowisp import DataReductionFile
from autowisp.astrometry import map_projections
from autowisp import fit_expression
[docs]
class Transformation:
"""
A class that applies transformation stored in DR files.
Attributes:
pre_projection(callable): One of the map projections in
autowisp.astrometry.map_projections that is applied first
to transform RA, Dec -> xi, eta. The latter are then projected to
x, y.
evaluate_transformation(fit_expression.Interface): Evaluator for the
pre-projected catalog -> frame coordinates transformation.
"""
[docs]
@staticmethod
def _create_projected_arrays(sources, save_intermediate, in_place):
"""Create a numpy structured array to hold the projected sources."""
intermediate_dtype = [("xi", numpy.float64), ("eta", numpy.float64)]
if in_place:
projected = sources
else:
projected_dtype = [("x", numpy.float64), ("y", numpy.float64)]
# pylint: enable=no-member
if save_intermediate:
projected_dtype.extend(intermediate_dtype)
projected = numpy.empty(
len(numpy.atleast_1d(sources["RA"])), dtype=projected_dtype
)
intermediate = (
projected
if save_intermediate
else numpy.empty(projected.shape, intermediate_dtype)
)
return intermediate, projected
[docs]
def __init__(self, dr_fname=None, **dr_path_substitutions):
"""
Prepare to apply the transformation stored in the given DR file.
Args:
dr_fname(str): The filename of the data reduction file to read a
transformation from. If not specified, read_transformation()
must be called before using this class.
Returns:
None
"""
self.pre_projection = None
self.pre_projection_center = None
self.inverse_pre_projection = None
self.evaluate_transformation = None
self.evaluate_terms = None
self._coefficients = None
self._term_indices = None
if dr_fname is not None:
with DataReductionFile(dr_fname, "r") as dr_file:
self.read_transformation(dr_file, **dr_path_substitutions)
[docs]
def read_transformation(self, dr_file, **dr_path_substitutions):
"""Read the transformation from the given DR file (already opened)."""
self.set_transformation(
pre_projection_name=(
dr_file.get_attribute(
"skytoframe.cfg.sky_preprojection", **dr_path_substitutions
)
+ "_projection"
),
pre_projection_center=dr_file.get_attribute(
"skytoframe.sky_center", **dr_path_substitutions
),
terms_expression=dr_file.get_attribute(
"skytoframe.terms", **dr_path_substitutions
),
coefficients=dr_file.get_dataset(
"skytoframe.coefficients", **dr_path_substitutions
),
)
[docs]
def set_transformation(
self,
pre_projection_center,
terms_expression,
coefficients,
pre_projection_name="tan_projection",
):
"""Set the projection to use."""
self.pre_projection_center = pre_projection_center
self.pre_projection = partial(
getattr(map_projections, pre_projection_name),
RA=pre_projection_center[0],
Dec=pre_projection_center[1],
)
self.inverse_pre_projection = partial(
getattr(map_projections, "inverse_" + pre_projection_name),
RA=pre_projection_center[0],
Dec=pre_projection_center[1],
)
self.evaluate_terms = fit_expression.Interface(terms_expression)
self._term_indices = {
term: index
for index, term in enumerate(
self.evaluate_terms.get_term_str_list()
)
}
self._coefficients = coefficients
[docs]
def __call__(self, sources, save_intermediate=False, in_place=False):
"""
Return the projected positions of the given catalogue sources.
Args:
sources(structured numpy array or pandas.DataFrame): The
catalogue sources to project.
save_intermediate(bool): If True, the result includes the
coordinate of the pre-projection, in addition to the final frame
coordinates.
in_place(bool): If True, the input `sources` are updated with the
projected coordinates (`sources` must allow setting entries for
`x` and `y` columns, also for `xi` and `eta` if
`save_intermediate` is True).
Returns:
numpy structured array:
The projected source positions with labels 'x', 'y', and
optionally (if save_intermediate == True) the pre-projected
coordinates `xi` and `eta`. If `in_place` is True, return None.
"""
intermediate, projected = self._create_projected_arrays(
sources, save_intermediate, in_place
)
self.pre_projection(sources, intermediate)
terms = self.evaluate_terms(sources, intermediate)
for index, coord in enumerate("xy"):
projected[coord] = self._coefficients[index].dot(terms)
return None if in_place else projected
[docs]
def inverse(self, x, y, result="equatorial", **source_properties):
"""Return the sky coordinates of the given source."""
def projection_error(xi_eta):
"""Apply the (xi, eta, source preperties) -> (x, y) projection."""
source_properties["xi"], source_properties["eta"] = xi_eta
terms = self.evaluate_terms(source_properties)
return numpy.array(
[
float(coef.dot(terms)) - target
for coef, target in zip(self._coefficients, [x, y])
]
)
assert result in ("equatorial", "pre_projected", "both")
offset_term = self._term_indices["1"]
xi_term = self._term_indices["(xi)"]
eta_term = self._term_indices["(eta)"]
xi_eta_guess = numpy.linalg.solve(
numpy.array(
[
[
self._coefficients[0][xi_term],
self._coefficients[0][eta_term],
],
[
self._coefficients[1][xi_term],
self._coefficients[1][eta_term],
],
]
),
numpy.array(
[
x - self._coefficients[0][offset_term],
y - self._coefficients[1][offset_term],
]
),
)
solution = root(projection_error, xi_eta_guess)
assert solution.success
pre_projected = unstructured_to_structured(
solution.x, names=["xi", "eta"]
)
if result == "pre_projected":
return pre_projected
equatorial = numpy.empty(1, dtype=[("RA", float), ("Dec", float)])
self.inverse_pre_projection(equatorial, pre_projected)
if result == "equatorial":
return equatorial
return pre_projected, equatorial
[docs]
def parse_command_line():
"""Return the configuration for running this module as a script."""
parser = ArgumentParser(
description="Provide command line interface to the transformation in a "
"DR file",
default_config_files=[],
formatter_class=DefaultsFormatter,
ignore_unknown_config_file_keys=False,
)
parser.add_argument(
"dr_fname",
metavar="DR_FILE",
help="The data reduction file to read the transformation from.",
)
parser.add_argument(
"--skytoframe_version",
type=int,
default=0,
help="The version of the sky-to-frame transformation to use.",
)
parser.add_argument(
"--project",
metavar=("RA", "Dec"),
nargs=2,
type=float,
default=None,
help="Sky coordinates to project.",
)
parser.add_argument(
"--inverse",
metavar=("x", "y"),
nargs=2,
type=float,
default=None,
help="Frame coordinates for which to find the corresponding RA, Dec .",
)
return parser.parse_args()
[docs]
def main(config):
"""Avoid global variables."""
transformation = Transformation(
config.dr_fname, skytoframe_version=config.skytoframe_version
)
if config.project is not None:
projected = transformation(
{"RA": config.project[0], "Dec": config.project[1]}
)
print(
f"RA: {config.project[0]!r}, Dec {config.project[1]!r} -> "
f"({projected!r})"
)
if config.inverse is not None:
unprojected = transformation.inverse(
config.inverse[0], config.inverse[1], result="both"
)
print(
f"({config.inverse[0]!r}, {config.inverse[1]!r}) -> "
f"{unprojected!r}"
)
if __name__ == "__main__":
main(parse_command_line())