Source code for autowisp.database.initialize_database

#!/usr/bin/env python3

"""Create all datbase tables and define default configuration."""

import re
import logging

from configargparse import ArgumentParser, DefaultsFormatter
from sqlalchemy import sql, select, delete, MetaData, update

from autowisp.database.interface import db_engine, Session
from autowisp.database.data_model.base import DataModelBase

from autowisp.database.initialize_data_reduction_structure import (
    get_default_data_reduction_structure,
)
from autowisp.database.initialize_light_curve_structure import (
    get_default_light_curve_structure,
)
from autowisp import processing_steps

# false positive due to unusual importing
# pylint: disable=no-name-in-module
from autowisp.database.data_model import (
    ImageType,
    Step,
    StepDependencies,
    Parameter,
    Configuration,
    Condition,
    ConditionExpression,
    ProcessingSequence,
    MasterType,
    InputMasterTypes,
)

# pylint: enable=no-name-in-module

_logger = logging.getLogger(__name__)

master_info = {
    "mask": {
        "must_match": frozenset(("CAMSN", "CLRCHNL")),
        "config_name": "master-mask",
        "created_by": None,
        "split_by": frozenset(),
        "used_by": [
            ("calibrate", "zero", False),
            ("calibrate", "dark", False),
            ("calibrate", "flat", False),
            ("calibrate", "object", False),
        ],
        "description": "A bit mask indicating hot/dead/... bad pixels.",
    },
    "zero": {
        "must_match": frozenset(("CAMSN", "CLRCHNL")),
        "config_name": "master-bias",
        "created_by": ("stack_to_master", "zero"),
        "split_by": frozenset(("OBS_SESN",)),
        "used_by": [
            ("calibrate", "dark", False),
            ("calibrate", "flat", False),
            ("calibrate", "object", False),
        ],
        "description": "An estimate of the zero level of a camera.",
    },
    "dark": {
        "must_match": frozenset(("CAMSN", "CLRCHNL")),
        "config_name": "master-dark",
        "created_by": ("stack_to_master", "dark"),
        "split_by": frozenset(("OBS_SESN",)),
        "used_by": [
            ("calibrate", "flat", False),
            ("calibrate", "object", False),
        ],
        "description": "An estimate of the dark current of a camera.",
    },
    "highflat": {
        "must_match": frozenset(("CAMSN", "CLRCHNL", "INTSN")),
        "config_name": "master-flat",
        "created_by": ("stack_to_master_flat", "flat"),
        "split_by": frozenset(("OBS_SESN",)),
        "used_by": [("calibrate", "object", False)],
        "description": "An estimate of the relative sensitivity of image "
        "pixels to light from infinity entering the telescope. Constructed from"
        " flat frames with high (but not saturated) light.",
    },
    "lowflat": {
        "must_match": frozenset(("CAMSN", "CLRCHNL", "INTSN")),
        "config_name": "low-flat-master-fname",
        "created_by": ("stack_to_master_flat", "flat"),
        "split_by": frozenset(("OBS_SESN",)),
        "used_by": [],
        "description": "An estimate of the relative sensitivity of image "
        "pixels to light from infinity entering the telescope. Constructed from"
        " flat frames with low light.",
    },
    "single_photref": {
        "must_match": frozenset(("FIELD", "CLRCHNL", "EXPTIME")),
        "config_name": "single-photref-dr-fname",
        "created_by": None,
        "split_by": frozenset(),
        "used_by": [
            ("fit_magnitudes", "object", False),
            ("create_lightcurves", "object", False),
            ("epd", "object", False),
            ("generate_epd_statistics", "object", False),
            ("tfa", "object", False),
            ("generate_tfa_statistics", "object", False),
        ],
        "description": "The reference image to use to start magnitude "
        "fitting. Subsequently replaced by average of the corrected "
        "brightnes of each star.",
    },
    "master_photref": {
        "must_match": frozenset(("FIELD", "CLRCHNL", "EXPTIME")),
        "config_name": "master-photref-dr-fname",
        "created_by": ("fit_magnitudes", "object"),
        "split_by": frozenset(),
        "used_by": [("fit_magnitudes", "object", True)],
        "description": "The master photometric reference to use for magnitude "
        "fitting if available.",
    },
    "magfit_stat": {
        "must_match": frozenset(("FIELD", "CLRCHNL", "EXPTIME")),
        "config_name": "magfit-stat-fname",
        "created_by": ("fit_magnitudes", "object"),
        "split_by": frozenset(),
        "used_by": [],
        "description": "The statistics file generated during magnitude fitting.",
    },
    "magfit_catalog": {
        "must_match": frozenset(("FIELD", "CLRCHNL", "EXPTIME")),
        "config_name": "magfit-catalog-fname",
        "created_by": ("fit_magnitudes", "object"),
        "split_by": frozenset(),
        "used_by": [],
        "description": "The catalog file generated during magnitude fitting.",
    },
    "lightcurve_catalog": {
        "must_match": frozenset(("FIELD", "CLRCHNL", "EXPTIME")),
        "config_name": "lightcurve-catalog-fname",
        "created_by": ("create_lightcurves", "object"),
        "split_by": frozenset(),
        "used_by": [
            ("epd", "object", False),
            ("generate_epd_statistics", "object", False),
            ("tfa", "object", False),
            ("generate_tfa_statistics", "object", False),
        ],
        "description": "The catalog file generated for collecting lightcurves.",
    },
    "epd_stat": {
        "must_match": frozenset(("FIELD", "CLRCHNL", "EXPTIME")),
        "config_name": "epd-statistics-fname",
        "created_by": ("epd", "object"),
        "split_by": frozenset(),
        "used_by": [("tfa", "object", False)],
        "description": "The statistics file showing the performance after EPD.",
    },
    "tfa_stat": {
        "must_match": frozenset(("FIELD", "CLRCHNL", "EXPTIME")),
        "config_name": "tfa-statistics-fname",
        "created_by": ("tfa", "object"),
        "split_by": frozenset(),
        "used_by": [],
        "description": "The statistics file showing the performance after TFA.",
    },
}


step_dependencies = [
    ("add_images_to_db", None, []),
    ("calibrate", "zero", []),
    ("stack_to_master", "zero", [("calibrate", "zero")]),
    ("calibrate", "dark", [("stack_to_master", "zero")]),
    ("stack_to_master", "dark", [("calibrate", "dark")]),
    (
        "calibrate",
        "flat",
        [("stack_to_master", "zero"), ("stack_to_master", "dark")],
    ),
    ("stack_to_master_flat", "flat", [("calibrate", "flat")]),
    (
        "calibrate",
        "object",
        [
            ("stack_to_master", "zero"),
            ("stack_to_master", "dark"),
        ],
    ),
    ("find_stars", "object", [("calibrate", "object")]),
    ("solve_astrometry", "object", [("find_stars", "object")]),
    (
        "fit_star_shape",
        "object",
        [("solve_astrometry", "object"), ("calibrate", "object")],
    ),
    (
        "measure_aperture_photometry",
        "object",
        [("fit_star_shape", "object"), ("calibrate", "object")],
    ),
    (
        "fit_source_extracted_psf_map",
        "object",
        [("find_stars", "object"), ("solve_astrometry", "object")],
    ),
    (
        "fit_magnitudes",
        "object",
        [
            ("solve_astrometry", "object"),
            ("fit_star_shape", "object"),
            ("measure_aperture_photometry", "object"),
            ("fit_source_extracted_psf_map", "object"),
        ],
    ),
    (
        "create_lightcurves",
        "object",
        [
            ("solve_astrometry", "object"),
            ("fit_star_shape", "object"),
            ("measure_aperture_photometry", "object"),
            ("fit_magnitudes", "object"),
            ("fit_source_extracted_psf_map", "object"),
        ],
    ),
    (
        "calculate_photref_merit",
        "object",
        [
            ("calibrate", "object"),
            ("find_stars", "object"),
            ("solve_astrometry", "object"),
            ("fit_star_shape", "object"),
            ("fit_source_extracted_psf_map", "object"),
        ],
    ),
    (
        "epd",
        "object",
        [("create_lightcurves", "object"), ("fit_magnitudes", "object")],
    ),
    (
        "generate_epd_statistics",
        "object",
        [("create_lightcurves", "object"), ("epd", "object")],
    ),
    (
        "tfa",
        "object",
        [
            ("epd", "object"),
            ("generate_epd_statistics", "object"),
            ("create_lightcurves", "object"),
            ("fit_magnitudes", "object"),
        ],
    ),
    (
        "generate_tfa_statistics",
        "object",
        [("create_lightcurves", "object"), ("tfa", "object")],
    ),
]


[docs] def get_command_line_parser(): """Create a parser with all required command line arguments.""" parser = ArgumentParser( description="Initialize the database for first time use of the " "pipeline.", formatter_class=DefaultsFormatter, ignore_unknown_config_file_keys=False, ) parser.add_argument( "--config-file", "-c", is_config_file=True, # default=config_file, help="Specify a configuration file in liu of using command line " "options. Any option can still be overriden on the command line.", ) parser.add_argument( "--drop-all-tables", action="store_true", help="If passed all pipeline tables are deleted before new ones are " "created", ) parser.add_argument( "--drop-hdf5-structure-tables", "--drop-structure", action="store_true", help="If passed, tables defining the structure of HDF5 files are " "dropped first and then re-created and filled. Otherwise, if tables " "exist, their contents is not modified.", ) parser.add_argument( "--verbose", choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], default="INFO", help="Set the verbosity of the DB logger.", ) return parser
[docs] def add_default_hdf5_structures(data_reduction=True, light_curve=True): """ Add a default HDF5 structure to the database. Args: data_reduction(bool): Should the structure of data reduction files be initialized? light_curve(bool): Should the structure of light curve files be initialized? """ # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member if data_reduction: db_session.add(get_default_data_reduction_structure()) if light_curve: db_session.add(get_default_light_curve_structure(db_session))
# This is meant to function as callable # pylint: disable=too-few-public-methods
[docs] class StepCreator: """Add steps to the database one by one."""
[docs] def __init__(self): """Get ready to add steps to the database.""" # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member self._step_id = 1 self._db_parameters = {} default_expression = ConditionExpression( id=1, expression="True", notes="Default expression" ) db_session.add(default_expression) # False positive # pylint: disable=not-callable self._default_condition = Condition( id=1, expression_id=default_expression.id, notes="Default configuration", ) # pylint: enable=not-callable db_session.add(self._default_condition)
[docs] def __call__(self, step_name, db_session): """Add a step with the given name to the database.""" self._default_condition = db_session.merge(self._default_condition) step_module = getattr(processing_steps, step_name) new_step = Step( id=self._step_id, name=step_name, description=step_module.__doc__ ) self._step_id += 1 print(f"Initializing {step_name} parameters") default_step_config = step_module.parse_command_line([]) print( "Default step config:\n\t" + "\n\t".join( f"{param}: {value}" for param, value in default_step_config[ "argument_defaults" ].items() ) ) for param in default_step_config["argument_descriptions"].keys(): if ( param not in [ "h", "config-file", "extra-config-file", "split-channels", ] and not param.endswith("-only-if") and not param.endswith("-version") ): description = default_step_config["argument_descriptions"][ param ] if isinstance(description, dict): description = description["help"] configuration = None if param not in self._db_parameters: self._db_parameters[param] = Parameter( name=param, description=description ) print( f"Setting {param} = " f'{default_step_config["argument_defaults"][param]}' ) # False positive # pylint: disable=not-callable configuration = Configuration( version=0, condition_id=self._default_condition.id, value=default_step_config["argument_defaults"][param], ) # pylint: enable=not-callable configuration.parameter = self._db_parameters[param] db_session.add(configuration) new_step.parameters.append(self._db_parameters[param]) db_session.add(new_step) return new_step
# pylint: enable=too-few-public-methods
[docs] def add_master_dependencies(db_session): """Fill the master_types table.""" def get_imtype_id(imtype_name): """Return the ID of the image type with the given name.""" return db_session.scalar( select(ImageType.id).where(ImageType.name == imtype_name) ) def get_step_id(step_name): """Return the ID of the step with the given name.""" return db_session.scalar(select(Step.id).where(Step.name == step_name)) expressions = set() for master_config in master_info.values(): expressions.update( master_config["must_match"], master_config["split_by"] ) db_expressions = { expr: ConditionExpression(expression=expr) for expr in expressions } db_session.add_all(db_expressions.values()) next_condition_id = db_session.scalar( # False positive # pylint: disable=no-member select(sql.functions.max(Condition.id) + 1) # pylint: enable=no-member ) condition_ids = {} for master_type, master_config in master_info.items(): print(f"Master {master_type} config: {master_config!r}") for condition in [ master_config["must_match"], master_config["split_by"], ]: if condition and condition not in condition_ids: db_session.add_all( [ # False positive # pylint: disable=not-callable Condition( id=next_condition_id, expression_id=db_expressions[expr].id, ) # pylint: enable=not-callable for expr in condition ] ) condition_ids[condition] = next_condition_id next_condition_id += 1 db_master_type = MasterType( name=master_type, condition_id=condition_ids[master_config["must_match"]], maker_step_id=( None if master_config["created_by"] is None else get_step_id(master_config["created_by"][0]) ), maker_image_type_id=( None if master_config["created_by"] is None else get_imtype_id(master_config["created_by"][1]) ), maker_image_split_condition_id=( condition_ids.get(master_config["split_by"]) ), description=master_config["description"], ) db_session.add(db_master_type) for step, image_type, optional in master_config["used_by"]: db_session.add( InputMasterTypes( step_id=get_step_id(step), image_type_id=get_imtype_id(image_type), master_type_id=db_master_type.id, config_name=master_config["config_name"], optional=optional, ) )
# No good way to simplify # pylint: disable=too-many-locals
[docs] def init_processing(): """Initialize the tables controlling how processing is to be done.""" image_type_list = [] for processing_order in step_dependencies: if ( processing_order[1] is not None and processing_order[1] not in image_type_list ): image_type_list.append(processing_order[1]) add_processing_step = StepCreator() # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member for image_type_id, image_type in enumerate(image_type_list, 1): db_session.add(ImageType(id=image_type_id, name=image_type)) db_steps = {} for processing_id, (step_name, image_type, dependencies) in enumerate( step_dependencies, 1 ): if step_name not in db_steps: db_steps[step_name] = add_processing_step(step_name, db_session) if step_name not in ["add_images_to_db", "calculate_photref_merit"]: db_session.add( ProcessingSequence( id=processing_id, step_id=db_steps[step_name].id, image_type_id=( None if image_type is None else image_type_list.index(image_type) + 1 ), ) ) for required_step, required_imtype in dependencies: db_session.add( StepDependencies( blocked_step_id=db_steps[step_name].id, blocked_image_type_id=image_type_list.index(image_type) + 1, blocking_step_id=db_steps[required_step].id, blocking_image_type_id=image_type_list.index( required_imtype ) + 1, ) ) add_master_dependencies(db_session)
# pylint: enable=too-many-locals
[docs] def drop_tables_matching(pattern): """Drop tables with names matching a pre-compiled regular expression.""" if pattern is None: metadata = MetaData() metadata.reflect(db_engine) metadata.drop_all(db_engine) else: DataModelBase.metadata.drop_all( db_engine, filter( lambda table: pattern.fullmatch(table.name), reversed(DataModelBase.metadata.sorted_tables), ), )
[docs] def _overwrite_defaults(new_defaults): """Overwrite default values from what is defined in the steps.""" db_conditions = {} db_expressions = {} # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member for param, all_values in new_defaults.items(): param_id = db_session.scalar( select(Parameter.id).filter_by(name=param) ) delete_default = True for condition_expressions, value in all_values: if condition_expressions is None: assert ( db_session.execute( update(Configuration) .where(Configuration.parameter_id == param_id) .values(value=value) ).rowcount == 1 ) delete_default = False else: for expression in condition_expressions: if expression not in db_expressions: db_session.add( ConditionExpression(expression=expression) ) db_expressions[expression] = db_session.scalar( select(ConditionExpression).filter_by( expression=expression ) ) expression_set = frozenset( db_expressions[expr].id for expr in condition_expressions ) _logger.debug("Expression set: %s", repr(expression_set)) condition_id = db_conditions.get(expression_set) _logger.debug( "Corresponding condition id: %s", condition_id ) if condition_id is None: condition_id = db_session.scalar( select(sql.functions.max(Condition.id) + 1) ) _logger.debug("New condition id: %d", condition_id) for expression_id in expression_set: db_session.add( Condition( id=condition_id, expression_id=expression_id ) ) db_session.flush() db_conditions[expression_set] = condition_id db_session.add( Configuration( parameter_id=param_id, condition_id=condition_id, version=0, value=value, ) ) if delete_default: assert ( db_session.execute( delete(Configuration).where( Configuration.parameter_id == param_id, Configuration.condition_id == 1, Configuration.version == 0, ) ).rowcount == 1 )
[docs] def initialize_database(cmdline_args, overwrite_defaults=None): """Initialize the database as specified on the command line.""" if cmdline_args.drop_hdf5_structure_tables: drop_tables_matching(re.compile("hdf5_.*")) if cmdline_args.drop_all_tables: drop_tables_matching(re.compile(".*")) DataModelBase.metadata.create_all(db_engine) add_default_hdf5_structures() if not cmdline_args.drop_all_tables: return init_processing() if overwrite_defaults is None: overwrite_defaults = { param: [(("False",), None)] for param in [ "srcfind-tool", "brightness-threshold", "filter-sources", "srcextract-max-sources", ] } _overwrite_defaults(overwrite_defaults)
if __name__ == "__main__": initialize_database(get_command_line_parser().parse_args())