Source code for autowisp.astrometry.astrometry
#!/usr/bin/env python3
"""Fit for a transformation between sky and image coordinates."""
import logging
from tempfile import TemporaryDirectory
import subprocess
import os
from traceback import format_exc
import time
from urllib.request import Request, urlopen
from urllib.error import URLError
import shutil
import numpy
from numpy.lib.recfunctions import structured_to_unstructured
from scipy import linalg
from scipy import spatial
from scipy.optimize import fsolve
from astropy.io import fits
from autowisp.astrometry.map_projections import (
gnomonic_projection,
inverse_gnomonic_projection,
)
from autowisp.astrometry.astrometry_net_client import (
Client as AstrometryNetClient,
)
_logger = logging.getLogger(__name__)
# pylint:disable=R0913
# pylint:disable=R0914
# pylint:disable=R0915
# pylint:disable=C0103
[docs]
def transformation_matrix(astrometry_order, xi, eta):
"""
Constructs the transformation matrix for a given astrometry order.
Args:
astrometry_order(int): The order of the transformation to fit
xi(numpy.ndarray): from projected coordinates
eta(numpy.ndarray): from projected coordinates
Returns:
trans_matrix(numpy.ndarray): transformation matrix
Notes:
Ex: for astrometry_order 2: 1, xi, eta, xi^2, xi*eta, eta^2
"""
# TODO: alocate matrix first with right size and then fill, instead of doing
# it the slow way below: new method seems to be not faster.
# if it is much slower than the initial, roll back!
# trans_matrix = numpy.ones((eta.shape[0], 1))
# for i in range(1, astrometry_order + 1):
# for j in range(i + 1):
# trans_matrix = numpy.block(
# [trans_matrix, xi ** (i - j) * eta ** j]
# )
num_terms = (astrometry_order + 1) * (astrometry_order + 2) // 2
n_points = len(xi)
trans_matrix = numpy.empty((n_points, num_terms))
trans_matrix[:, 0] = 1
col = 1
for i in range(1, astrometry_order + 1):
for j in range(i + 1):
trans_matrix[:, col] = ((xi ** (i - j)) * (eta**j)).ravel()
col += 1
return trans_matrix
[docs]
def find_ra_dec(xieta_guess, trans_x, trans_y, radec_cent, frame_x, frame_y):
"""
Find the (xi, eta) that map to given coordinates in the frame.
Args:
xieta_guess(numpy array): Starting point for the solver trying find
(xi, eta) that map to the given coordinates.
trans_x(numpy array): transformation matrix for x
trans_y(numpy array): transformation matrix for y
radec_cent(dict): The RA and Dec of the center of the gnomonic
projection defining (xi, eta).
frame_x(float): x coordinate for which to find RA, Dec.
frame_y(float): y coordinate for which to find RA, Dec.
Returns:
new_xieta_cent(numpy array): the new center function for (xi, eta)
"""
assert trans_x.size == trans_y.size
astrometry_order = (numpy.sqrt(1.0 + 8.0 * trans_x.size) - 3.0) / 2.0
assert numpy.allclose(astrometry_order, numpy.round(astrometry_order), atol=1e-10)
astrometry_order = numpy.rint(astrometry_order).astype(int)
def equations(xieta_cent):
"""The equations that need to be solved to find the center."""
xi = xieta_cent[0]
eta = xieta_cent[1]
new_xieta_cent = numpy.empty(2)
new_xieta_cent[0] = trans_x[0, 0] - frame_x
new_xieta_cent[1] = trans_y[0, 0] - frame_y
k = 1
for i in range(1, astrometry_order + 1):
for j in range(i + 1):
new_xieta_cent[0] = (
new_xieta_cent[0] + trans_x[k, 0] * xi ** (i - j) * eta**j
)
new_xieta_cent[1] = (
new_xieta_cent[1] + trans_y[k, 0] * xi ** (i - j) * eta**j
)
k = k + 1
return new_xieta_cent
xieta_cent = numpy.empty(1, dtype=[("xi", float), ("eta", float)])
xieta_cent["xi"], xieta_cent["eta"] = fsolve(equations, xieta_guess)
source = numpy.empty(1, dtype=[("RA", float), ("Dec", float)])
inverse_gnomonic_projection(source, xieta_cent, **radec_cent)
return {"RA": source["RA"][0], "Dec": source["Dec"][0]}
[docs]
def estimate_transformation_from_corr(
initial_corr, ra_cent, dec_cent, tweak_order, astrometry_order
):
"""
Estimate the transformation from astrometry.net correspondence file.
Args:
initial_corr(structured numpy array): The correspondence file
containing field_x, field_y, index_ra, and index_dec
ra_cent(float): Estimate of the RA of the center of the frame
dec_cent(float): Estimate of the Dec of the center of the frame
tweak_order(int): The order of the astrometry.net transformation
astrometry_order(int): The order of the transformation that will be
fit. Fills terms of higher order than `tweak_order` with zeros.
Returns:
matrix:
Estimate of the transformation x(xi, eta)
matrix:
Estimate of the transformation y(xi, eta)
"""
projected = numpy.empty(
initial_corr.shape[0], dtype=[("xi", float), ("eta", float)]
)
radec_center = {"RA": ra_cent, "Dec": dec_cent}
gnomonic_projection(initial_corr, projected, **radec_center)
xi = projected["xi"][numpy.newaxis].T
eta = projected["eta"][numpy.newaxis].T
trans_matrix = transformation_matrix(tweak_order, xi, eta)
num_trans_terms = ((astrometry_order + 1) * (astrometry_order + 2)) // 2
num_tweak_terms = ((tweak_order + 1) * (tweak_order + 2)) // 2
trans_x = numpy.zeros(num_trans_terms)
trans_y = numpy.zeros(num_trans_terms)
trans_x[:num_tweak_terms] = linalg.lstsq(trans_matrix, initial_corr["x"])[0]
trans_y[:num_tweak_terms] = linalg.lstsq(trans_matrix, initial_corr["y"])[0]
return trans_x[numpy.newaxis].T, trans_y[numpy.newaxis].T
[docs]
class TempAstrometryFiles:
"""Context manager for the temporary files needed for astrometry."""
[docs]
def __init__(self, file_types=("sources", "corr", "axy", "config")):
"""Create all required temporary files."""
self._temp_dir_obj = TemporaryDirectory()
self.temp_dir_path = self._temp_dir_obj.name
self._file_types = file_types
for file_type in self._file_types:
full_path = os.path.join(self.temp_dir_path, file_type)
setattr(self, file_type + "_fname", full_path)
[docs]
def __enter__(self):
"""Return the filenames of the temporary files."""
return tuple(
getattr(self, file_type + "_fname") for file_type in self._file_types
)
[docs]
def __exit__(self, *ignored_args, **ignored_kwargs):
"""Close and delete the temporary files."""
self._temp_dir_obj.cleanup()
[docs]
def create_sources_file(xy_extracted, sources_fname):
"""Create a FITS BinTable file with the given name containing
the extracted sources.
Returns: an array containing x-y extracted sources
"""
x_extracted = fits.Column(name="x", format="D", array=xy_extracted["x"])
y_extracted = fits.Column(name="y", format="D", array=xy_extracted["y"])
xyls = fits.BinTableHDU.from_columns([x_extracted, y_extracted])
xyls.writeto(sources_fname)
return xy_extracted
[docs]
def create_config_file(config_fname, fov_range, anet_indices):
"""Create configuration file set up to solve imaves FOV in given range."""
with open(config_fname, "w", encoding="utf-8") as config_file:
if min(fov_range) < 2.0:
config_file.write(f"add_path {anet_indices[0]}\n")
if max(fov_range) > 0.5:
config_file.write(f"add_path {anet_indices[1]}\n")
config_file.write("autoindex\n")
with open(config_fname, "r", encoding="utf-8") as config_file:
_logger.debug("Astrometry.net engine config:\n%s", config_file.read())
[docs]
def get_initial_corr_local(
header, xy_extracted, tweak_order_range, fov_range, anet_indices
):
"""Get inital extracted to catalog source match using ``solve-field``."""
with TempAstrometryFiles() as (
sources_fname,
corr_fname,
axy_fname,
config_fname,
):
xy_extracted = create_sources_file(xy_extracted, sources_fname)
create_config_file(config_fname, fov_range, anet_indices)
for tweak in range(tweak_order_range[0], tweak_order_range[1] + 1):
solve_field_command = [
"solve-field",
sources_fname,
"--backend-config",
config_fname,
"--corr",
corr_fname,
"--width",
str(header["NAXIS1"]),
"--height",
str(header["NAXIS2"]),
"--tweak-order",
str(tweak),
"--match",
"none",
"--wcs",
"none",
"--index-xyls",
"none",
"--rdls",
"none",
"--solved",
"none",
"--axy",
axy_fname,
"--no-plots",
"--scale-low",
repr(fov_range[0]),
"--scale-high",
repr(fov_range[1]),
"--overwrite",
]
_logger.debug(
"Starting solve-field command:\n\t%s",
"\n\t\t".join(solve_field_command),
)
try:
subprocess.run(solve_field_command, check=True)
except subprocess.SubprocessError:
_logger.critical("solve-field failed with error:\n%s", format_exc())
continue
if not os.path.isfile(corr_fname):
_logger.critical(
"Correspondence file %s not created.", repr(corr_fname)
)
return "solve-field failed", 0
with fits.open(corr_fname, mode="readonly") as corr:
result = numpy.copy(corr[1].data[:])
if result.size > ((tweak + 1) * (tweak + 2)) // 2:
return result, tweak
return "solve-field failed", 0
[docs]
def get_initial_corr_web(header, xy_extracted, tweak_order_range, fov_range, api_key):
"""Get initial extracted to catalog source match using web astrometry.net."""
config = {
"allow_commercial_use": "n",
"allow_modifications": "n",
"publicly_visible": "n",
"scale_lower": fov_range[0],
"scale_upper": fov_range[1],
"scale_type": "ul",
"scale_units": "degwidth",
"image_width": header["NAXIS1"],
"image_height": header["NAXIS2"],
# 'center_ra',
# 'center_dec',
# 'radius',
# 'downsample_factor',
# 'positional_error',
"x": xy_extracted["x"],
"y": xy_extracted["y"],
}
client = AstrometryNetClient()
while True:
try:
client.login(api_key)
break
except URLError as err:
_logger.error(
"Failed to connect to astrometry.net server: %s\nRetrying...",
err.reason,
)
time.sleep(20)
for tweak_order in range(tweak_order_range[0], tweak_order_range[1] + 1):
config["tweak_order"] = tweak_order
upload_result = client.upload(**config)
if upload_result["status"] != "success":
return upload_result["status"], 0
assert "subid" in upload_result
solved_job_id = None
while solved_job_id is None:
time.sleep(5)
submission_status = client.sub_status(upload_result["subid"], justdict=True)
_logger.debug("Astrometry.net submission status: %s", submission_status)
jobs = submission_status.get("jobs", [])
job_id = None
if len(jobs):
for job_id in jobs:
if job_id is not None:
break
if job_id is not None:
_logger.debug("Selecting job id %s", job_id)
solved_job_id = job_id
while True:
job_status = client.job_status(solved_job_id, justdict=True).get(
"status", ""
)
_logger.debug("Got job status: %s", job_status)
if job_status in ["success", "failure"]:
break
time.sleep(5)
if job_status == "failure":
return "web solve failed", 0
corr_url = client.apiurl.replace("/api/", f"/corr_file/{solved_job_id:d}")
with TempAstrometryFiles(("corr",)) as (corr_fname,):
_logger.debug("Retrieving file from '%s' to '%s'", corr_url, corr_fname)
headers = {
"Referer": "https://nova.astrometry.net/api/login",
"Cookie": f"session={client.session}",
}
req = Request(corr_url, headers=headers)
with urlopen(req) as remote_corr, open(corr_fname, "wb") as local_corr:
shutil.copyfileobj(remote_corr, local_corr)
with fits.open(corr_fname, mode="readonly") as corr:
result = numpy.copy(corr[1].data[:])
if result.size > (tweak_order + 1) * (tweak_order + 2) // 2:
return result, tweak_order
return "web solve failed", 0
[docs]
def get_initial_corr(
*, dr_file, xy_extracted, config, header=None, web_lock=None
):
"""Attempt to estimate the sky-to-frame transformation for given DR file."""
if header is None:
header = dr_file.get_frame_header()
initial_corr_arg = (
header,
xy_extracted,
config["tweak_order_range"],
config["fov_range"],
)
if (
"anet_indices" in config
and os.path.exists(config["anet_indices"][0])
and os.path.exists(config["anet_indices"][1])
):
return get_initial_corr_local(
*initial_corr_arg, config["anet_indices"]
)
with web_lock:
return get_initial_corr_web(
*initial_corr_arg, config["anet_api_key"]
)
[docs]
def estimate_transformation(
*, config, **initial_corr_kwarg
):
"""Attempt to estimate the sky-to-frame transformation for given DR file."""
field_corr, tweak_order = get_initial_corr(
config=config, **initial_corr_kwarg
)
if tweak_order == 0:
return None, None, field_corr
initial_corr = numpy.zeros(
(field_corr["field_x"].shape),
dtype=[("x", ">f8"), ("y", ">f8"), ("RA", ">f8"), ("Dec", ">f8")],
)
initial_corr["x"] = field_corr["field_x"]
initial_corr["y"] = field_corr["field_y"]
initial_corr["RA"] = field_corr["index_ra"]
initial_corr["Dec"] = field_corr["index_dec"]
return estimate_transformation_from_corr(
initial_corr=initial_corr,
tweak_order=tweak_order,
astrometry_order=config["astrometry_order"],
ra_cent=config["ra_cent"],
dec_cent=config["dec_cent"],
) + ("success",)
[docs]
def refine_transformation(
*,
astrometry_order,
max_srcmatch_distance,
max_iterations,
trans_threshold,
trans_x,
trans_y,
ra_cent,
dec_cent,
x_frame,
y_frame,
xy_extracted,
catalog,
min_source_safety_factor=5.0,
):
"""
Iterate the process until we get a transformation that
its difference from the previous one is less than a threshold
Args:
astrometry_order(int): The order of the transformation to fit
max_srcmatch_distance(float): The upper bound distance in the
KD tree
trans_threshold(float): The threshold for the difference of two
consecutive transformations
trans_x(numpy array): Initial estimate for the x transformation
matrix
trans_y(numpy array): Initial estimate for the x transformation
matrix
ra_cent(float): Initial estimate for the RA of the center of the
frame
dec_cent(float): Initial estimate for the Dec of the center of the
frame
x_frame(float): length of the frame in pixels
y_frame(float): width of the frame in pixels
xy_extracted(structured numpy array): x and y of the extracted
sources of the frame
catalog(pandas.DataFrame): The catalog of sources to match to
Returns:
trans_x(2D numpy array):
the coefficients of the x(xi, eta) transformation
trans_y(2D numpy array):
the coefficients of the y(xi, eta) transformation
cat_extracted_corr(structured numpy array):
the catalogs extracted correspondence indexes
res_rms(float): the residual
ratio(float): the ratio of matched to unmatched
ra_cent(float): the new RA center array
dec_cent(float): the new Dec center array
"""
x_cent = x_frame / 2.0
y_cent = y_frame / 2.0
xy_extracted = structured_to_unstructured(xy_extracted)[:, 0:2]
# xy_extracted = xy_extracted[:, 0:2]
counter = 0
x_transformed = numpy.inf
y_transformed = numpy.inf
logger = logging.getLogger(__name__)
kdtree = spatial.KDTree(xy_extracted)
while True:
counter += 1
if counter > 1:
# TODO: fix pylint disables here
# pylint:disable=used-before-assignment
ra_cent = cent_new["RA"]
dec_cent = cent_new["Dec"]
# pylint:enable=used-before-assignment
radec_cent = {"RA": ra_cent, "Dec": dec_cent}
projected = numpy.empty(catalog.shape[0], dtype=[("xi", float), ("eta", float)])
gnomonic_projection(catalog, projected, **radec_cent)
xi = projected["xi"].reshape(-1, 1) # Reshape to (n, 1)
eta = projected["eta"].reshape(-1, 1) # Reshape to (n, 1)
trans_matrix_xy = transformation_matrix(astrometry_order, xi, eta)
old_x_transformed = x_transformed
old_y_transformed = y_transformed
x_transformed = trans_matrix_xy @ trans_x
y_transformed = trans_matrix_xy @ trans_y
in_frame = numpy.logical_and(
numpy.logical_and(x_transformed > 0, x_transformed < x_frame),
numpy.logical_and(y_transformed > 0, y_transformed < y_frame),
).flatten()
diff = numpy.sqrt(
(old_x_transformed - x_transformed) ** 2
+ (old_y_transformed - y_transformed) ** 2
).flatten()[in_frame]
logger.debug("diff: %s", repr(diff.max()))
if not (diff > trans_threshold).any() or counter > max_iterations:
# pylint:disable=used-before-assignment
cat_extracted_corr = numpy.empty((n_matched, 2), dtype=int)
cat_extracted_corr[:, 0] = numpy.arange(catalog.shape[0])[matched]
cat_extracted_corr[:, 1] = ix[matched]
# Exclude the sources that are not within the frame:
return (
trans_x,
trans_y,
cat_extracted_corr,
res_rms,
ratio,
ra_cent,
dec_cent,
counter <= max_iterations,
)
# pylint:enable=used-before-assignment
xy_transformed = numpy.block([x_transformed, y_transformed])
d, ix = kdtree.query(xy_transformed, distance_upper_bound=max_srcmatch_distance)
result, count = numpy.unique(ix, return_counts=True)
for multi_match_i in result[count > 1][:-1]:
bad_match = ix == multi_match_i
d[bad_match] = numpy.inf
ix[bad_match] = result[-1]
matched = numpy.isfinite(d)
n_matched = matched.sum()
n_extracted = len(xy_extracted)
# TODO: add weights to residual and to the fit eventually
res_rms = numpy.sqrt(numpy.square(d[matched]).mean())
ratio = n_matched / n_extracted
logger.debug("# of matched: %d out of %d", n_matched, n_extracted)
matched_sources = numpy.empty(
n_matched,
dtype=[("RA", float), ("Dec", float), ("x", float), ("y", float)],
)
j = 0
k = -1
for i in range(ix.size):
k += 1
if not numpy.isinf(d[i]):
matched_sources["RA"][j] = catalog["RA"].iloc[k]
matched_sources["Dec"][j] = catalog["Dec"].iloc[k]
matched_sources["x"][j] = xy_extracted[ix[i], 0]
matched_sources["y"][j] = xy_extracted[ix[i], 1]
j += 1
cent_new = find_ra_dec(
numpy.array([numpy.mean(xi), numpy.mean(eta)]),
trans_x,
trans_y,
radec_cent,
x_cent,
y_cent,
)
projected_new = numpy.empty(
matched_sources.shape[0], dtype=[("xi", float), ("eta", float)]
)
gnomonic_projection(matched_sources, projected_new, **cent_new)
trans_matrix = transformation_matrix(
astrometry_order,
projected_new["xi"].reshape(projected_new["xi"].size, 1),
projected_new["eta"].reshape(projected_new["eta"].size, 1),
)
print(trans_matrix.shape)
if trans_matrix.shape[0] <= (min_source_safety_factor * trans_matrix.shape[1]):
raise ValueError(
"The number of equations is "
"insufficient to solve transformation "
"coefficients"
)
trans_x = linalg.lstsq(
trans_matrix,
matched_sources["x"].reshape(matched_sources["x"].size, 1),
)[0]
trans_y = linalg.lstsq(
trans_matrix,
matched_sources["y"].reshape(matched_sources["y"].size, 1),
)[0]