Source code for autowisp.processing_steps.add_images_to_db

#!/usr/bin/env python3

"""Register new images with the database."""

from datetime import timedelta
import logging

from astropy import units
from astropy.time import Time
from astropy.coordinates import EarthLocation, SkyCoord

from autowisp.multiprocessing_util import setup_process
from autowisp import Evaluator
from autowisp.file_utilities import find_fits_fnames
from autowisp.processing_steps.manual_util import ManualStepArgumentParser
from autowisp.database.interface import Session

# false positive due to unusual importing
# pylint: disable=no-name-in-module
from autowisp.database.data_model.provenance import (
    Observer,
    Camera,
    Telescope,
    Mount,
    Observatory,
)
from autowisp.database.data_model import (
    Image,
    ImageType,
    ObservingSession,
    Target,
)

# pylint: enable=no-name-in-module

_logger = logging.getLogger(__name__)


[docs] def parse_command_line(*args): """Return the parsed command line arguments.""" if args: inputtype = "" else: inputtype = "raw" parser = ManualStepArgumentParser( description=__doc__, input_type=inputtype, add_exposure_timing=True ) parser.add_argument( "--observer", default="ORIGIN", help="The name of the observer who/which collected the images. Can " "be arbitrary expression involving header keywords. Must already have " "an entry in the ``observer`` table.", ) parser.add_argument( "--camera-serial-number", "--cam-sn", default="CAMSN", help="The serial number of the camera which collected the images. Can " "be arbitrary expression involving header keywords. Must already have " "an entry in the ``camera`` table.", ) parser.add_argument( "--telescope-serial-number", "--tel-sn", default="INTSN", help="The serial number of the telescope (lens) which collected the " "images (or some other unique and persistent identifier of it). Can " "be arbitrary expression involving header keywords. Must already have " "an entry in the ``telescope`` table.", ) parser.add_argument( "--mount-serial-number", "--mount-sn", default="OBSERVER", help="The serial number of the mount which collected the " "images (or some other unique and persistent identifier of it). Can " "be arbitrary expression involving header keywords. " "Must already have an entry in the ``mount`` table.", ) parser.add_argument( "--observatory", default=None, help="The name of the observatory from where the images were collected." " Can be arbitrary expression involving header keywords. Must already " "have an entry in the ``observatory`` table. If not specified, " "--observatory-location is used to determine the observatory.", ) parser.add_argument( "--observatory-location", metavar=("LATITUDE", "LONGITUDE", "ALTITUDE"), default=["LAT_OBS", "LONG_OBS", "ALT_OBS"], nargs=3, help="The latitude and longitude of the observatory from where the " "images were collected. Can be arbitrary expression involving header " "keywords. Must already have an entry in the ``observatory`` table " "within approximately 100km. Only used if --observatory is not " "specified.", ) parser.add_argument( "--target-ra", "--ra", default="RA_MNT", help="The RA targetted by the observations in degrees. Can be arbitrary" " expression involving header keywords. If target table already " "contains an entry for this target, the RA must match within 1%% of the" " field of view or an error is raised. It can be left unspecified if " "the target is already in the target table, in which case it will be " "identified by name.", ) parser.add_argument( "--target-dec", "--dec", default="DEC_MNT", help="The Dec targetted by the observations. See --target-ra for " "details.", ) parser.add_argument( "--target-name", "--target", default="FIELD", help="The name of the targetted area of the sky. Can be arbitrary " "expression involving header keywords. If not already in the target " "table it is automatically added.", ) parser.add_argument( "--observing-session-label", "--session-label", "--session", default="SEQID", help="Unique label for the observing session. Can be arbitrary " "expression involving header keywords. If not already in the " "observing_session table it is automatically added. It will also be " "added as ``OBS-SESN`` keyword to the calibrated images.", ) parser.add_argument( "--image-type", default=None, help="Header expression that evaluates to the image type. If it is not " "one of the image types listed in the database, the image is ignored. " "If not specified, the individual checks below are used instead.", ) parser.add_argument( "--ignore-unknown-image-types", action="store_true", default=False, help="If this option is passed and an image of an unknown type is " "encountered it will not be added tot he database.", ) # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member for image_type in [ record[0] for record in db_session.query(ImageType.name).all() ]: parser.add_argument( f"--{image_type}-check", default=str(image_type == "object"), help="Header expression that evaluates to True if the image is " f"a {image_type} frame.", ) return parser.parse_args(*args)
[docs] def get_or_create_target( image_type, header_eval, configuration, db_session, field_of_view ): """Return the target corresponding to the image (create if necessary).""" target_name = header_eval(configuration["target_name"]) db_target = ( db_session.query(Target).filter_by(name=target_name).one_or_none() ) no_pointing_imtypes = ["zero", "dark", "flat"] if image_type in no_pointing_imtypes: image_target = {"ra": None, "dec": None} else: image_target = { "ra": header_eval(configuration["target_ra"]), "dec": header_eval(configuration["target_dec"]), } if db_target is None: # False positive # pylint: disable=not-callable db_target = Target( **image_target, name=header_eval(configuration["target_name"]) ) # pylint: enable=not-callable db_session.add(db_target) elif image_type not in no_pointing_imtypes: image_target = SkyCoord( image_target["ra"] * units.deg, image_target["dec"] * units.deg ) _logger.debug( f"Checking target {target_name} for {image_type!r} image. " f"From DB: {db_target!r} vs image: {image_target!r}" ) assert ( image_target.separation( SkyCoord( ra=db_target.ra * units.deg, dec=db_target.dec * units.deg ) ) < 0.01 * field_of_view ) return db_target
[docs] def _match_observatory(db_observatory, image_location): """True iff the observatory matches the image location.""" db_location = EarthLocation( lat=db_observatory.latitude * units.deg, lon=db_observatory.longitude * units.deg, height=db_observatory.altitude * units.m, ) return ( (image_location.x - db_location.x) ** 2 + (image_location.y - db_location.y) ** 2 + (image_location.z - db_location.z) ** 2 ) ** 0.5 < 100 * units.km
[docs] def get_observatory(header_eval, configuration, db_session): """Return the observatory corresponding to the image (must exist).""" _logger.debug( "Observatory location: %s", repr(configuration["observatory_location"]) ) latitude, longitude, altitude = ( header_eval(expression) for expression in configuration["observatory_location"] ) image_location = EarthLocation( lat=latitude * units.deg, lon=longitude * units.deg, height=altitude * units.m, ) if configuration["observatory"] is None: observatory = None for db_observatory in db_session.query(Observatory).all(): if _match_observatory(db_observatory, image_location): assert observatory is None observatory = db_observatory else: observatory = ( db_session.query(Observatory) .filter_by(name=header_eval(configuration["observatory"])) .one() ) assert _match_observatory(observatory, image_location) return observatory
[docs] def get_or_create_observing_session( image_type, header_eval, configuration, db_session ): """Return the observing session the image is part of (create if needed).""" observer = ( db_session.query(Observer) .filter_by(name=header_eval(configuration["observer"])) .one() ) camera = ( db_session.query(Camera) .filter_by( serial_number=header_eval(configuration["camera_serial_number"]) ) .one() ) telescope = ( db_session.query(Telescope) .filter_by( serial_number=header_eval(configuration["telescope_serial_number"]) ) .one() ) mount = ( db_session.query(Mount) .filter_by( serial_number=header_eval(configuration["mount_serial_number"]) ) .one() ) observatory = get_observatory(header_eval, configuration, db_session) field_of_view = ( max(camera.camera_type.x_resolution, camera.camera_type.y_resolution) * camera.camera_type.pixel_size * units.um / (telescope.telescope_type.focal_length * units.mm) ) * units.rad target = get_or_create_target( image_type, header_eval, configuration, db_session, field_of_view ) exposure_start = None for time_format in ("utc", "jd"): if configuration[f"exposure_start_{time_format}"]: exposure_start = Time( header_eval(configuration[f"exposure_start_{time_format}"]), format=None if time_format == "utc" else time_format, ) header_eval.symtable["JD-OBS"] = exposure_start.jd + header_eval( configuration["exposure_seconds"] ) / (2.0 * 24.0 * 3600.0) exposure_start = exposure_start.utc.to_value("datetime") assert exposure_start is not None exposure_end = exposure_start + timedelta( seconds=header_eval(configuration["exposure_seconds"]) ) result = ( db_session.query(ObservingSession) .filter_by(label=header_eval(configuration["observing_session_label"])) .one_or_none() ) if result is None: result = ObservingSession( observer_id=observer.id, camera_id=camera.id, telescope_id=telescope.id, mount_id=mount.id, observatory_id=observatory.id, target_id=target.id, label=header_eval(configuration["observing_session_label"]), start_time_utc=exposure_start, end_time_utc=exposure_end, ) else: if ( result.observer_id != observer.id or result.camera_id != camera.id or result.telescope_id != telescope.id or result.mount_id != mount.id or result.observatory_id != observatory.id or result.target_id != target.id ): raise RuntimeError( "Mismatch between observing session and other header " "information:\n\t" + "\n\t".join( [ f'{what} ID: header = {getattr(result, what + "_id")} ' f"session = {locals()[what]}" for what in [ "observer", "camera", "telescope", "mount", "observatory", "target", ] ] ) ) if exposure_start < result.start_time_utc: result.start_time_utc = exposure_start if exposure_end > result.end_time_utc: result.end_time_utc = exposure_end return result
[docs] def create_image(image_fname, header_eval, configuration, db_session): """Create the database Image entry corresponding to the given file.""" recognized_image_types = [ record[0] for record in db_session.query(ImageType.name).all() ] if configuration["image_type"]: image_type = header_eval(configuration["image_type"]).lower() if image_type not in recognized_image_types: if configuration["ignore_unknown_image_types"]: return None, None raise ValueError( f"Unrecognized image type {image_type!r} " f"(expected one of {recognized_image_types})" ) else: image_type = None for test_image_type in recognized_image_types: if header_eval(configuration[f"{test_image_type}_check"]): assert image_type is None image_type = test_image_type image_type_id = ( db_session.query(ImageType.id).filter_by(name=image_type).one()[0] ) # False positive # pylint: disable=not-callable return Image(raw_fname=image_fname, image_type_id=image_type_id), image_type
# pylint: enable=not-callable
[docs] def add_images_to_db(image_collection, configuration): """Add all the images in the collection to the database.""" for image_fname in image_collection: logging.debug("Adding image %s to database", image_fname) header_eval = Evaluator(image_fname) header_eval.symtable["FULLPATH"] = image_fname _logger.debug( "Defining evaluator with keys: %s", repr(header_eval.symtable.keys()), ) # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member image, image_type = create_image( image_fname, header_eval, configuration, db_session ) if image is None: continue existing_image = ( db_session.query(Image) .filter_by(raw_fname=image.raw_fname) .one_or_none() ) image.observing_session = get_or_create_observing_session( image_type, header_eval, configuration, db_session ) if existing_image is None: db_session.add(image) else: logging.info( "Image %s already in the database with ID: %s", image.raw_fname, existing_image.id, ) assert existing_image.image_type_id == image.image_type_id assert ( existing_image.observing_session_id == image.observing_session.id )
if __name__ == "__main__": cmdline_config = parse_command_line() setup_process(task="main", **cmdline_config) add_images_to_db( find_fits_fnames(cmdline_config.pop("raw_images")), cmdline_config )