#!/usr/bin/env python3
# pylint: disable=too-many-lines
"""Utilities for querying catalogs for astrometry."""
from os import path, makedirs
import logging
from hashlib import md5
from contextlib import nullcontext
import numpy
import pandas
from astropy import units
from astropy.io import fits
from astroquery.gaia import GaiaClass, conf
from autowisp import Evaluator, DataReductionFile
from autowisp.astrometry import Transformation
from autowisp.astrometry.map_projections import (
gnomonic_projection,
inverse_gnomonic_projection,
)
if __name__ == "__main__":
import doctest
from configargparse import ArgumentParser, DefaultsFormatter
from matplotlib import pyplot
_logger = logging.getLogger(__name__)
[docs]
class WISPGaia(GaiaClass):
"""Extend queries with condition and sorting."""
[docs]
def _get_result(self, query, add_propagated, verbose=False):
"""Get and format the result as specified by user."""
job = self.launch_job_async(query, verbose=verbose)
result = job.get_results()
_logger.debug("Gaia query result: %s", repr(result))
_logger.debug("Gaia query result columns: %s", repr(result.colnames))
if result.colnames == ["num_obj"]:
return result["num_obj"][0]
result.rename_column("ra", "ra_orig")
result.rename_column("dec", "dec_orig")
for colname in result.colnames:
new_colname = colname.lower()
if colname != new_colname:
result.rename_column(colname, colname.lower())
if add_propagated:
propagated = {
coord: numpy.empty(len(result)) for coord in ["RA", "Dec"]
}
for i, pos in enumerate(result["propagated"]):
for coord, value_str in zip(
["RA", "Dec"], pos.strip().strip("()").split(",")
):
propagated[coord][i] = float(value_str) * 180.0 / numpy.pi
result.remove_column("propagated")
for coord in add_propagated:
result.add_column(propagated[coord], name=coord)
return result
# pylint: disable=too-many-locals
[docs]
def query_object_filtered(
self,
*,
ra,
dec,
width,
height,
order_by,
condition=None,
epoch=None,
columns=None,
order_dir="ASC",
max_objects=None,
verbose=False,
count_only=False,
):
"""
Get GAIA sources within a box satisfying given condition (ADQL).
Args:
ra: The RA of the center of the box to query, with units.
dec: The declination of the center of the box to query,
with units.
width: The width of the box (half goes on each side of ``ra``),
with units.
height: The height of the box (half goes on each side of
``dec``), with units.
order_by(str): How should the stars be ordered.
condition(str): Condition the returned sources must satisfy
(typically imposes a brightness limit)
epoch: The epoch to propagate the positions to. If unspecified,
no propagation will be done.
columns(iterable): List of columns to select from the catalog. If
unspecified, all columns will be returned.
order_dir(str): Should the order be ascending (``'ASC'``) or
descending (``'DESC'``).
max_objects(int): Maximum number of objects to return.
verbose(bool): Use verbose mode when submitting querries to GAIA.
count_only(bool): If ``True``, only the number of objects is
returned without actually fetching the data.
Returns:
astropy Table:
The result of the query.
"""
if count_only:
columns = "COUNT(*) AS num_obj"
elif columns is None:
columns = "*"
else:
add_propagated = []
for coord in ["RA", "Dec"]:
try:
add_propagated.append(coord)
except ValueError:
pass
columns = ", ".join(map(str, columns))
if "*" in columns and not count_only:
add_propagated = ["RA", "Dec"]
if epoch is not None:
epoch = epoch.to_value(units.yr)
columns = (
"EPOCH_PROP_POS(ra, dec, parallax, pmra, pmdec, "
f"radial_velocity, ref_epoch, {epoch}) AS propagated, "
) + columns
corners = numpy.empty(shape=(4,), dtype=[("RA", float), ("Dec", float)])
corners_xi_eta = numpy.empty(
shape=(4,), dtype=[("xi", float), ("eta", float)]
)
width = width.to_value(units.deg)
height = height.to_value(units.deg)
corners_xi_eta[0] = (-width / 2, -height / 2)
corners_xi_eta[1] = (-width / 2, height / 2)
corners_xi_eta[2] = (width / 2, height / 2)
corners_xi_eta[3] = (width / 2, -height / 2)
inverse_gnomonic_projection(
corners,
corners_xi_eta,
RA=ra.to_value(units.deg),
Dec=dec.to_value(units.deg),
)
table_name = self.MAIN_GAIA_TABLE or conf.MAIN_GAIA_TABLE
select = "SELECT"
if max_objects is not None:
select += f" TOP {max_objects}"
query_str = f"""
{select}
{columns}
FROM {table_name}
WHERE
1 = CONTAINS(
POINT(
{self.MAIN_GAIA_TABLE_RA},
{self.MAIN_GAIA_TABLE_DEC}
),
POLYGON(
{corners[0]['RA']},
{corners[0]['Dec']},
{corners[1]['RA']},
{corners[1]['Dec']},
{corners[2]['RA']},
{corners[2]['Dec']},
{corners[3]['RA']},
{corners[3]['Dec']}
)
)
"""
if condition is not None:
query_str += f"""
AND
({condition})
"""
if not count_only:
query_str += f"""
ORDER BY
{order_by}
{order_dir}
"""
return self._get_result(
query_str,
epoch is not None and not count_only and add_propagated,
verbose,
)
# pylint: enable=too-many-locals
[docs]
def query_brightness_limited(
self, *, magnitude_expression, magnitude_limit, **query_kwargs
):
"""
Get sources within a box and a range of magnitudes.
Args:
magnitude_expression: Expression for the relevant magnitude
involving Gaia columns.
magnitude_limit: Either upper limit or lower and upper limit on
the magnitude defined by ``magnitude_expression``.
**query_kwargs: Arguments passed directly to
`query_object_filtered()`.
Returns:
astropy Table:
The result of the query.
"""
_logger.debug(
"Querying Gaia for sources with magnitude: %s, "
"limits: %s, and kwargs: %s",
repr(magnitude_expression),
repr(magnitude_limit),
repr(query_kwargs),
)
if query_kwargs.get("columns", False):
query_kwargs["columns"] = query_kwargs["columns"] + [
f"({magnitude_expression}) AS magnitude"
]
else:
query_kwargs["columns"] = [
f"({magnitude_expression}) AS magnitude",
"*",
]
if "order_by" not in query_kwargs:
query_kwargs["order_by"] = "magnitude"
query_kwargs["order_dir"] = "ASC"
if magnitude_limit is not None:
try:
min_mag, max_mag = magnitude_limit
condition = (
f"({magnitude_expression}) > {min_mag} AND "
f"({magnitude_expression}) < {max_mag}"
)
except ValueError:
condition = f"{magnitude_expression} < {magnitude_limit[0]}"
except TypeError:
condition = f"{magnitude_expression} < {magnitude_limit}"
if "condition" in query_kwargs:
query_kwargs["condition"] = (
f'({query_kwargs["condition"]}) AND ({condition})'
)
else:
query_kwargs["condition"] = condition
return self.query_object_filtered(**query_kwargs)
gaia = WISPGaia()
# This comes from astroquery (not in our control)
# pylint: disable=invalid-name
gaia.MAIN_GAIA_TABLE = "gaiadr3.gaia_source"
# pylint: enable=invalid-name
[docs]
def create_catalog_file(fname, overwrite=False, **query_kwargs):
"""
Create a catalog FITS file from a Gaia query.
Args:
fname(str): Name of the catalog file to create.
**query_kwargs: Arguments passed directly to
`gaia.query_brightness_limited()`.
"""
query = gaia.query_brightness_limited(**query_kwargs)
if query_kwargs.get("count_only", False):
print("Number of sources: ", repr(query))
return
for colname in [
"DESIGNATION",
"phot_variable_flag",
"datalink_url",
"epoch_photometry_url",
"libname_gspphot",
]:
try:
query[colname] = query[colname].astype(str)
except KeyError:
pass
query.meta["CATALOG"] = "Gaia"
query.meta["CATVER"] = gaia.MAIN_GAIA_TABLE
for k in ["ra", "dec", "width", "height"]:
query.meta[k.upper()] = query_kwargs[k].to_value(units.deg)
query.meta["EPOCH"] = (
query_kwargs["epoch"].to_value(units.yr)
if query_kwargs["epoch"] is not None
else None
)
query.meta["MAGEXPR"] = query_kwargs["magnitude_expression"]
if query_kwargs.get("magnitude_limit") is not None:
try:
(query.meta["MAGMIN"], query.meta["MAGMAX"]) = query_kwargs[
"magnitude_limit"
]
except ValueError:
query.meta["MAGMAX"] = query_kwargs["magnitude_limit"][0]
except TypeError:
query.meta["MAGMAX"] = query_kwargs["magnitude_limit"]
if path.dirname(fname) and not path.exists(path.dirname(fname)):
makedirs(path.dirname(fname))
query.write(fname, format="fits", overwrite=overwrite)
[docs]
def read_catalog_file(
cat_fits,
filter_expr=None,
sort_expr=None,
return_metadata=False,
add_gnomonic_projection=False,
):
"""
Read a catalog FITS file.
Args:
cat_fits(str, or opened FITS file): The file to read.
Returns:
pandas.DataFrame:
The catalog information as columns.
"""
if isinstance(cat_fits, str):
with fits.open(cat_fits) as opened_cat_fits:
return read_catalog_file(
opened_cat_fits,
filter_expr,
sort_expr,
return_metadata,
add_gnomonic_projection,
)
fixed_dtype = cat_fits[1].data.dtype.newbyteorder("=")
result = pandas.DataFrame.from_records(
cat_fits[1].data.astype(fixed_dtype), index="source_id"
)
if return_metadata or add_gnomonic_projection:
metadata = cat_fits[1].header
cat_eval = Evaluator(result)
if sort_expr is not None:
sort_val = cat_eval(sort_expr)
if filter_expr is not None:
print("Filter expression: " + repr(filter_expr))
filter_val = cat_eval(filter_expr)
print("Filter val: " + repr(filter_val))
filter_val = filter_val.astype(bool)
result = result.loc[filter_val]
if sort_expr is not None:
sort_val = sort_val[filter_val]
if sort_expr is not None:
result = result.iloc[numpy.argsort(sort_val)]
if add_gnomonic_projection:
if "xi" not in result.columns:
assert "eta" not in result.columns
for colname in ["xi", "eta"]:
result.insert(len(result.columns), colname, numpy.nan)
gnomonic_projection(
result, result, RA=metadata["RA"], Dec=metadata["Dec"]
)
if return_metadata:
return result, metadata
return result
[docs]
def parse_command_line():
"""Return configuration of catalog to create."""
parser = ArgumentParser(
description="Create a catalog file from a Gaia query.",
default_config_files=[],
formatter_class=DefaultsFormatter,
ignore_unknown_config_file_keys=False,
)
parser.add_argument(
"--ra",
type=float,
default=118.0,
help="The right ascention (deg) of the center of the field to query.",
)
parser.add_argument(
"--dec",
type=float,
default=2.6,
help="The declination (deg) of the center of the field to query.",
)
parser.add_argument(
"--width",
type=float,
default=17.0,
help="The width (deg) of the field to query (along RA direction).",
)
parser.add_argument(
"--height",
type=float,
default=17.0,
help="The height (deg) of the field to query (along dec direction).",
)
parser.add_argument(
"--epoch",
"-t",
type=float,
default=None,
help="The epoch for proper motion corrections in years. If not "
"specified, positions are not propagated.",
)
parser.add_argument(
"--magnitude-expression",
default="phot_g_mean_mag",
help="The expression to use as the relevant magnitude estimate.",
)
parser.add_argument(
"--magnitude-limit",
nargs="+",
type=float,
default=12.0,
help="Either maximum magnitude or minimum and maximum magnitude limits "
"to impose.",
)
parser.add_argument(
"--extra-condition",
default=None,
help="An extra condition to impose on the selected sources.",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Print out information about the query being executed.",
)
parser.add_argument(
"--overwrite",
action="store_true",
help="Overwrite existing catalog file.",
)
parser.add_argument(
"--catalog-fname",
default="gaia.fits",
help="The name of the catalog file to create.",
)
parser.add_argument(
"--count-only",
action="store_true",
help="Only count the number of sources in the query.",
)
parser.add_argument(
"--columns",
default=[
"source_id",
"ra",
"dec",
"pmra",
"pmdec",
"phot_g_n_obs",
"phot_g_mean_mag",
"phot_g_mean_flux",
"phot_g_mean_flux_error",
"phot_bp_n_obs",
"phot_bp_mean_mag",
"phot_bp_mean_flux",
"phot_bp_mean_flux_error",
"phot_rp_n_obs",
"phot_rp_mean_mag",
"phot_rp_mean_flux",
"phot_rp_mean_flux_error",
"phot_proc_mode",
"phot_bp_rp_excess_factor",
],
help="The columns to include in the catalog file. Use '*' to include "
"everything.",
)
parser.add_argument(
"--show-stars",
action="store_true",
help="Show the stars in the catalog on a 3-D plot of the sky.",
)
parser.add_argument(
"--run-doctests",
"--test",
action="store_true",
help="Run the unit tests defined in the function docstrings of this "
"module.",
)
return parser.parse_args()
[docs]
def _find_best_fov(points, fov_size):
"""
Find square of a given size that fits the most points.
Example:
>>> import numpy
>>> from autowisp.catalog import _find_best_fov
>>> points = numpy.array([(0.0, 0.0),
... (0.1, 0.0),
... (0.1, 0.1),
... (3.2, 0.0),
... (3.2, 3.1)],
... dtype=[('RAcosDec', float), ('Dec', float)])
>>> _find_best_fov(points, 1.0)
((0.0, 0.0), 3)
>>> _find_best_fov(points, 10.0)
((0.0, 0.0), 5)
>>> _find_best_fov(points, 0.11)
((0.0, 0.0), 3)
>>> _find_best_fov(points, 0.05)
((0.0, 0.0), 1)
>>> _find_best_fov(points, 3.11)
((0.1, 0.0), 4)
>>> points = numpy.array([(0.0, 0.0),
... (0.1, 0.0),
... (0.1, -0.1),
... (3.2, 0.0),
... (3.2, 3.0)],
... dtype=[('RAcosDec', float), ('Dec', float)])
>>> _find_best_fov(points, 1.0)
((0.0, -0.1), 3)
>>> _find_best_fov(points, 10.0)
((0.0, -0.1), 5)
>>> _find_best_fov(points, 0.11)
((0.0, -0.1), 3)
>>> _find_best_fov(points, 0.05)
((0.0, 0.0), 1)
>>> _find_best_fov(points, 3.11)
((0.1, -0.1), 4)
"""
max_count = 0
best_fov = None, None
# Sort points for faster search
points = numpy.sort(points, order="RAcosDec")
sorted_dec = numpy.sort(points["Dec"])
_logger.debug(
"Finding best FOV for %d points:\n%s", points.size, repr(points)
)
for x_start_index in range(points.size):
# No need to check further if peints beyond i are fewer than max found so
# far
if points.size - x_start_index <= max_count:
_logger.debug("Stopping search at x_start_index=%d", x_start_index)
break
x = points[x_start_index]["RAcosDec"]
# Use a more efficient x-mask by limiting the range of points we
# consider.
# Find the range in the sorted array where x is within
# [x, x + fov_size]
x_end_index = numpy.searchsorted(
points["RAcosDec"], x + fov_size, side="right"
)
if x_end_index - x_start_index <= max_count:
continue
for y_ind, y in enumerate(sorted_dec):
# No need try higher y bounds
if sorted_dec.size - y_ind <= max_count:
break
# Count the points within the range [y1, y1 + fov_size]
in_range = numpy.logical_and(
points[x_start_index:x_end_index]["Dec"] >= y,
points[x_start_index:x_end_index]["Dec"] <= y + fov_size,
).sum()
if in_range > max_count:
max_count = in_range
best_fov = (x, y)
_logger.debug(
"For x=%s, y=%s, index range is [%d, %d) with %d points surviving.",
repr(x),
repr(y),
x_start_index,
x_end_index,
in_range,
)
# No need to check further y bounds
if y + fov_size > sorted_dec[-1]:
break
# No need to check further once right edge of window moves past last
# point
if x + fov_size > points["RAcosDec"][-1]:
_logger.debug("Stopping search at x=%s", repr(x))
break
_logger.debug(
"Best FOV found: %s <= x <= %s, %s <= y <= %s with %d points.",
repr(best_fov[0]),
repr(best_fov[0] + fov_size),
repr(best_fov[1]),
repr(best_fov[1] + fov_size),
max_count,
)
return best_fov, max_count
[docs]
def find_outliers(center_ra_dec, max_allowed_offset):
"""
Find frames that are outliers in pointing.
Args:
center_ra_dec(numpy.array): The center RA and Dec of all the frames
being considered. The data type should include columns named
``'RA'`` and ``'Dec'``.
max_allowed_offset(float): The maximum allowed offset in the RA and
Dec directions in degrees. The most outlier frames is iteratively
rejected until this criterion is satisfied.
Returns:
[int]:
List of the indices within ``center_ra_dec`` that were rejected as
outliers.
"""
points = numpy.empty(
center_ra_dec.shape,
dtype=[("RAcosDec", numpy.float64), ("Dec", numpy.float64)],
)
points["RAcosDec"] = center_ra_dec["RA"] * numpy.cos(
center_ra_dec["Dec"] * numpy.pi / 180.0
)
points["Dec"] = center_ra_dec["Dec"]
(min_ra_cos_dec, min_dec), num_inside = _find_best_fov(
points, max_allowed_offset
)
if num_inside < 2 <= center_ra_dec.size:
raise ValueError(
"Attempting to determine field of view to cover frames with no "
"conssistent pointing."
)
outside = (
(points["RAcosDec"] < min_ra_cos_dec)
| (points["RAcosDec"] > min_ra_cos_dec + max_allowed_offset)
| (points["Dec"] < min_dec)
| (points["Dec"] > min_dec + max_allowed_offset)
)
return numpy.arange(points.size)[outside]
# TODO: Maybe simplify later
# pylint: disable=too-many-locals
# pylint: disable=too-many-branches
[docs]
def get_max_abs_corner_xi_eta(
header,
*,
transformation=None,
dr_files=None,
center=None,
max_offset=10.0,
**dr_path_substitutions,
):
"""
Return the max absolute values of the frame corners xi and eta.
Args:
header (astropy.io.fits.Header): The header of the image to find corner
xi and eta of..
transformation: The transformation to assume applies to the image. If
not specified, transformations are read from ``dr_files`` and max is
taken over all of them.
dr_files: The list of DR files to read transformations from. If
transformation is not specified. Ignored if transformation is
specified.
center: The center to assume for the projection. If not specified, the
center is calculated as average of max and min RA and Dec for all
DR files.
max_offset: The largest allowed offset in the RA or Dec directions
between the centers of frames. Outliers by more than this many
degrees are flagged and excluded from consideration.
dr_path_substitutions: The substitutions to use when reading
transformations from DR files.
Returns:
float:
The maximum absolute value of xi over all corners of all frame.
float:
The maximum absolute value of eta over all corners of all frame.
structured array:
The center around which xi, eta are defined. Returned only if center
was not specified and more than one DR file was specified.
array(int):
The indices within DR files that were dropped as outliers in the
pointing. Returned only if center was not specified and more than
one DR file was specified.
"""
assert dr_files or transformation
if transformation:
assert header
center_ra_dec = numpy.empty(
1 if transformation is not None else len(dr_files),
dtype=[("RA", float), ("Dec", float)],
)
change_center = transformation is None or center is not None
corner_coords = numpy.empty(
4 * center_ra_dec.size,
dtype=(
[("RA", float), ("Dec", float)]
if change_center
else [("xi", float), ("eta", float)]
),
)
corner_ind = 0
for this_trans in dr_files if transformation is None else [transformation]:
if transformation is None:
with DataReductionFile(this_trans, "r") as dr_file:
header = dr_file.get_frame_header()
this_trans = Transformation()
this_trans.read_transformation(dr_file, **dr_path_substitutions)
center_ra_dec[corner_ind // 4] = tuple(this_trans.pre_projection_center)
for x in [0.0, float(header["NAXIS1"])]:
for y in [0.0, float(header["NAXIS2"])]:
corner_coords[corner_ind] = this_trans.inverse(
x,
y,
result=("equatorial" if change_center else "pre_projected"),
)
corner_ind += 1
if center is None and transformation is None and len(dr_files) > 1:
center = {}
outliers = numpy.array(sorted(find_outliers(center_ra_dec, max_offset)))
if outliers.size:
keep = numpy.ones(center_ra_dec.size, dtype=bool)
keep[outliers] = False
center_ra_dec = center_ra_dec[keep]
keep = numpy.ones(corner_coords.size, dtype=bool)
for i in range(4):
keep[outliers + i] = False
corner_coords = corner_coords[keep]
for coord in ["Dec", "RA"]:
min_coord = numpy.min(center_ra_dec[coord])
max_coord = numpy.max(center_ra_dec[coord])
center[coord] = 0.5 * (min_coord + max_coord)
return_center = True
else:
return_center = False
if change_center:
xi_eta = numpy.empty(
corner_coords.size, dtype=[("xi", float), ("eta", float)]
)
gnomonic_projection(corner_coords, xi_eta, **center)
else:
xi_eta = corner_coords
result = (
max(abs(xi_eta["xi"].min()), abs(xi_eta["xi"].max())),
max(abs(xi_eta["eta"].min()), abs(xi_eta["eta"].max())),
)
if max(result) > 40.0:
raise RuntimeError(
"Observations with field of view exceeding 40 degrees are not "
"supported."
)
return result + ((center, outliers) if return_center else ())
# pylint: enable=too-many-locals
# pylint: enable=too-many-branches
[docs]
def get_catalog_info(
*,
dr_files=None,
transformation=None,
header=None,
configuration,
**dr_path_substitutions,
):
"""Get the configuration of the catalog needed for this frame."""
assert dr_files or transformation
_logger.debug("Creating catalog info from: %s", repr(configuration))
if transformation is None and len(dr_files) == 1:
with DataReductionFile(dr_files[0], "r") as dr_file:
if header is None:
header = dr_file.get_frame_header()
transformation = Transformation()
transformation.read_transformation(dr_file, **dr_path_substitutions)
if dr_files is None or len(dr_files) == 1:
max_offset = None
else:
max_offset = configuration["max_pointing_offset"]
trans_fov = get_max_abs_corner_xi_eta(
dr_files=dr_files,
transformation=transformation,
header=header,
max_offset=max_offset,
**dr_path_substitutions,
)
if transformation is not None:
trans_fov += (
dict(zip(["RA", "Dec"], transformation.pre_projection_center)),
numpy.array([]),
)
_logger.debug("Catalog FOV info: %s", repr(trans_fov))
pointing_precision = configuration["pointing_precision"] * units.deg
catalog_info = {
"dec_ind": int(
numpy.round(trans_fov[2]["Dec"] * units.deg / pointing_precision)
)
}
catalog_info["dec"] = pointing_precision * catalog_info["dec_ind"]
catalog_info["ra_ind"] = int(
numpy.round(
(trans_fov[2]["RA"] * units.deg * numpy.cos(catalog_info["dec"]))
/ pointing_precision
)
)
catalog_info["ra"] = (
pointing_precision
* catalog_info["ra_ind"]
/ numpy.cos(catalog_info["dec"])
)
catalog_info["magnitude_expression"] = configuration["magnitude_expression"]
if configuration["max_magnitude"] is None:
assert configuration["min_magnitude"] is None
catalog_info["magnitude_limit"] = None
else:
catalog_info["magnitude_limit"] = (
(configuration["max_magnitude"],)
if configuration["min_magnitude"] is None
else (
configuration["min_magnitude"],
configuration["max_magnitude"],
)
)
_logger.debug(
"From transformation estimate half FOV is: %s", repr(trans_fov)
)
frame_fov_estimate = tuple(
numpy.round(
(
max(
2.0 * trans_fov[i] * units.deg,
configuration.get("frame_fov_estimate", (0, 0))[i],
)
+ pointing_precision
)
/ (configuration["fov_precision"] * units.deg)
)
* configuration["fov_precision"]
* units.deg
for i in range(2)
)
_logger.debug(
"Determining catalog query size from: "
"frame_fov_estimate=%s, "
"image_scale_factor=%s",
repr(frame_fov_estimate),
repr(1.0 + 2.0 * configuration["fov_safety_margin"]),
)
catalog_info["width"], catalog_info["height"] = (
fov_size * (1.0 + 2.0 * configuration["fov_safety_margin"])
for fov_size in frame_fov_estimate
)
if header is None:
with DataReductionFile(dr_files[0], "r") as dr_file:
header = dr_file.get_frame_header()
catalog_info["epoch"] = Evaluator(header)(configuration["epoch"])
if dr_files and len(dr_files) > 1:
for dr_fname in dr_files[1:]:
with DataReductionFile(dr_fname, "r") as dr_file:
if (
Evaluator(dr_file.get_frame_header())(
configuration["epoch"]
)
!= catalog_info["epoch"]
):
raise RuntimeError(
"Not all data reduction files to be covered by a single"
" catalog have the same epoch"
)
else:
catalog_info["epoch"] = Evaluator(header)(configuration["epoch"])
catalog_info["columns"] = configuration["columns"]
get_checksum = md5()
for cfg in sorted(catalog_info.items()):
get_checksum.update(repr(cfg).encode("ascii"))
catalog_info["fname"] = configuration["fname"].format(
**dict(header), **catalog_info, checksum=get_checksum.hexdigest()
)
_logger.debug("Created catalog info: %s", repr(catalog_info))
return catalog_info, trans_fov[2], trans_fov[3]
# No god way to simplify
# pylint: disable=too-many-branches
[docs]
def ensure_catalog(
*,
dr_files=None,
transformation=None,
header=None,
configuration,
lock=nullcontext(),
return_metadata=True,
**dr_path_substitutions,
):
"""Re-use or create astrometry catalog suitable for the given frame."""
assert dr_files or transformation
catalog_info, query_center, outliers = get_catalog_info(
transformation=transformation,
dr_files=dr_files,
header=header,
configuration=configuration,
**dr_path_substitutions,
)
with lock:
if path.exists(catalog_info["fname"]):
with fits.open(catalog_info["fname"]) as cat_fits:
catalog_header = cat_fits[1].header
# pylint: disable=too-many-boolean-expressions
if (
numpy.abs(
catalog_header["EPOCH"]
- catalog_info["epoch"].to_value("yr")
)
> 0.25
):
raise RuntimeError(
f'Catalog {catalog_info["fname"]} '
f'has epoch {catalog_header["EPOCH"]!r}, '
f'but {catalog_info["epoch"]!r} is needed'
)
if (
catalog_header["MAGEXPR"]
!= catalog_info["magnitude_expression"]
):
raise RuntimeError(
f'Catalog {catalog_info["fname"]} has '
f'magnitude expression {catalog_header["MAGEXPR"]!r} '
f'instead of {catalog_info["magnitude_expression"]!r}'
)
if (
catalog_header.get("MAGMIN") is not None
and len(catalog_info["magnitude_limit"]) == 2
and (
catalog_header["MAGMIN"]
> catalog_info["magnitude_limit"][0]
)
):
raise RuntimeError(
f'Catalog {catalog_info["fname"]} excludes '
f'sources brighter than {catalog_header["MAGMIN"]!r} '
f'but {catalog_info["magnitude_limit"][0]!r} are '
"required."
)
if catalog_header.get("MAGMAX") is not None and (
catalog_header["MAGMAX"]
< catalog_info["magnitude_limit"][-1]
):
raise RuntimeError(
f'Catalog {catalog_info["fname"]} excludes '
f'sources fainter than {catalog_header["MAGMAX"]!r} but'
f' {catalog_info["magnitude_limit"][-1]!r} are '
"required."
)
if catalog_header["WIDTH"] < catalog_info["width"].to_value(
units.deg
):
raise RuntimeError(
f'Catalog {catalog_info["fname"]} width '
f'{catalog_header["WIDTH"]!r} is less than the required'
f' {catalog_info["width"]!r}'
)
if catalog_header["HEIGHT"] < catalog_info["height"].to_value(
units.deg
):
raise RuntimeError(
f'Catalog {catalog_info["fname"]} height '
f'{catalog_header["HEIGHT"]!r} is less than the '
f'required {catalog_info["height"]!r}'
)
if (
catalog_header["RA"] - query_center["RA"]
) * units.deg * numpy.cos(
catalog_header["DEC"] * units.deg
) > configuration[
"pointing_precision"
] * units.deg:
raise RuntimeError(
f'Catalog {catalog_info["fname"]} center RA '
f'{catalog_header["RA"]!r} is too far from the '
f'required RA={query_center["RA"]!r}'
)
if (
catalog_header["DEC"] - query_center["Dec"]
) * units.deg > configuration["pointing_precision"] * units.deg:
raise RuntimeError(
f'Catalog {catalog_info["fname"]} center Dec '
f'{catalog_header["DEC"]!r} is too far from the '
f'required Dec={query_center["Dec"]!r}'
)
filter_expr = configuration["filter"]
if filter_expr is None or header["CLRCHNL"] not in filter_expr:
filter_expr = []
else:
filter_expr = ["(" + filter_expr[header["CLRCHNL"]] + ")"]
if len(catalog_info["magnitude_limit"]) == 2 and (
catalog_header.get("MAGMIN") is None
or catalog_header["MAGMIN"]
< catalog_info["magnitude_limit"][0]
):
filter_expr.append(
f'(magnitude > {catalog_info["magnitude_limit"][0]!r})'
)
if catalog_header.get("MAGMAX") is None or (
catalog_header["MAGMAX"]
> catalog_info["magnitude_limit"][-1]
):
filter_expr.append(
'(magnitude < {catalog_info["magnitude_limit"][-1]!r})'
)
# pylint: enable=too-many-boolean-expressions
return (
read_catalog_file(
cat_fits,
filter_expr=(
" and ".join(filter_expr) if filter_expr else None
),
return_metadata=return_metadata,
),
outliers,
catalog_info["fname"],
)
del catalog_info["ra_ind"]
del catalog_info["dec_ind"]
create_catalog_file(**catalog_info, verbose=True)
return (
read_catalog_file(
catalog_info["fname"], return_metadata=return_metadata
),
outliers,
catalog_info["fname"],
)
# pylint: enable=too-many-branches
[docs]
def check_catalog_coverage(
header, transformation, catalog_header, safety_margin
):
"""
Return True iff te catalog covers the frame fully including a safety margin.
Args:
header(dict-like): The header of the frame being astrometried.
transformation(dict): The transformation to assume applies to the
given image. Should contain entries for ``'trans_x'``,
``'trans_y'``, ``'ra_cent'`` and ``'dec_cent'``.
catalog_header(dict-like): The header of the catalog being used for
astrometry.
safety_margin(float): The absolute values of xi and eta
(relative to the catalog center) corresponding to the corners of the
frame increased by this fraction must be smaller than the field of
view of the catalogfor the frame to be considered covered.
Returns:
bool:
Whether the catalog covers the frame fully including the safety
margin.
"""
width, height = get_max_abs_corner_xi_eta(
header=header,
transformation=transformation,
center={"RA": catalog_header["RA"], "Dec": catalog_header["DEC"]},
)
factor = 2.0 * (1.0 + safety_margin)
width *= factor
height *= factor
_logger.debug(
"Coverage with safety requires FOV (width: %s deg, height: %s def)",
repr(width),
repr(height),
)
return width < catalog_header["WIDTH"] and height < catalog_header["HEIGHT"]
[docs]
def show_stars(catalog_fname):
"""Show the stars in the catalog on a 3-D plot of the sky."""
phi, theta = numpy.mgrid[
0.0 : numpy.pi / 2.0 : 10j, 0.0 : 2.0 * numpy.pi : 10j
]
sphere_x = numpy.sin(phi) * numpy.cos(theta)
sphere_y = numpy.sin(phi) * numpy.sin(theta)
sphere_z = numpy.cos(phi)
fig = pyplot.figure()
axes = fig.add_subplot(111, projection="3d")
pyplot.gca().plot_surface(
sphere_x,
sphere_y,
sphere_z,
rstride=1,
cstride=1,
color="c",
alpha=0.6,
linewidth=1,
)
stars = read_catalog_file(catalog_fname)
stars_x = numpy.cos(numpy.radians(stars["Dec"])) * numpy.cos(
numpy.radians(stars["RA"])
)
stars_y = numpy.cos(numpy.radians(stars["Dec"])) * numpy.sin(
numpy.radians(stars["RA"])
)
stars_z = numpy.sin(numpy.radians(stars["Dec"]))
axes.scatter(stars_x, stars_y, stars_z, color="k", s=20)
pyplot.show()
[docs]
def main(config):
"""Avoid polluting global namespace."""
if config.run_doctests:
doctest.testmod(verbose=config.verbose)
return
kwargs = {
"ra": config.ra * units.deg,
"dec": config.dec * units.deg,
"width": config.width * units.deg,
"height": config.height * units.deg,
"epoch": config.epoch * units.yr if config.epoch is not None else None,
"magnitude_expression": config.magnitude_expression,
"magnitude_limit": config.magnitude_limit,
"columns": config.columns,
"verbose": config.verbose,
"overwrite": config.overwrite,
"count_only": config.count_only,
}
if config.extra_condition is not None:
kwargs["condition"] = config.extra_condition
create_catalog_file(config.catalog_fname, **kwargs)
if config.show_stars and not config.count_only:
show_stars(config.catalog_fname)
if __name__ == "__main__":
main(parse_command_line())