Source code for autowisp.database.lightcurve_processing

#!/usr/bin/env python3

"""Define a class that automates the processing of light curves."""
import os

if os.name == "posix":
    from os import getpgid, setsid, fork
#pylint: disable=wrong-import-position
from os import path, getpid

import logging
import sys
import subprocess

from sqlalchemy import select, and_, literal, update, sql, delete
import numpy

from autowisp.multiprocessing_util import setup_process
from autowisp import DataReductionFile, LightCurveFile
from autowisp.catalog import read_catalog_file
from autowisp.database.interface import Session
from autowisp.database.processing import ProcessingManager
from autowisp.database.user_interface import get_processing_sequence
from autowisp.light_curves.collect_light_curves import DecodingStringFormatter
from autowisp import processing_steps

# False positive due to unusual importing
# pylint: disable=no-name-in-module
from autowisp.database.data_model import (
    Image,
    ImageType,
    InputMasterTypes,
    LightCurveStatus,
    LightCurveProcessingProgress,
    MasterFile,
    MasterType,
    ProcessingSequence,
    Step,
    StepDependencies,
)

# pylint: enable=no-name-in-module
#pylint: enable=wrong-import-position


[docs] class LightCurveProcessingManager(ProcessingManager): """ Utilities for automated processing of lightcurves. Attrs: See `ProcessingManager`. _pending(dict): Keys are 'EPD' and 'TFA' and values are lists of single photometric reference filenames for which the given detrending step is pending. """
[docs] def _mark_progress(self, which, status=1, final=True): """ Record that current step has finished processing given lightcurve(s). Returns: None """ if isinstance(which, int): which = [which] assert status > 0 # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member for star in which: if isinstance(star, int): src_id = star else: with LightCurveFile(star, "r+") as light_curve: src_id = int(light_curve["Identifiers"][0][1]) if final: db_session.execute( delete(LightCurveStatus).filter_by(id=src_id) ) else: db_session.execute( update(LightCurveStatus) .where(id=src_id) .values(status=status) )
[docs] def _cleanup_interrupted(self, db_session): """Don't do anything for lightcurves."""
[docs] def _get_lc_fnames( self, *, step, db_sphotref, catalog_fname, lc_fname, db_session ): """Return the lightcurves to be processed by the current step.""" def check_add(src_id, lc_fname): """Check if the given LC exists and mark src_id started if so.""" if path.exists(lc_fname): db_session.add( LightCurveStatus( id=src_id, progress_id=self._current_processing.id, status=0, ) ) return True return False previous = db_session.scalar( select( # False positive # pylint: disable=not-callable sql.func.count(LightCurveProcessingProgress.id) # pylint: enable=not-callable ).where( LightCurveProcessingProgress.step_id == step.id, LightCurveProcessingProgress.single_photref_id == (db_sphotref.id), LightCurveProcessingProgress.configuration_version == (self.step_version[step.name]), LightCurveProcessingProgress.id != self._current_processing.id, ) ) if previous: source_list = db_session.scalars( select(LightCurveStatus.id) .join(LightCurveProcessingProgress) .where( LightCurveProcessingProgress.step_id == step.id, LightCurveProcessingProgress.single_photref_id == (db_sphotref.id), LightCurveProcessingProgress.configuration_version == (self.step_version[step.name]), ) ).all() else: self._logger.debug( "Reading LC catalog file: %s", repr(catalog_fname) ) catalog = read_catalog_file(catalog_fname) source_list = catalog.index srcid_formatter = DecodingStringFormatter() lc_fnames = map( lambda src_id: srcid_formatter.format( lc_fname, *numpy.atleast_1d(src_id) ), source_list, ) if previous: lc_fnames = list(lc_fnames) for check in lc_fnames: assert path.exists(check) return lc_fnames return [ lc for src_id, lc in zip(source_list, lc_fnames) if check_add(src_id, lc) ]
[docs] def _specialize_config( self, *, step, step_config, db_sphotref, catalog, db_session ): """Add parts of configuration for step that depend on database.""" step_config["image_type"] = self._current_image_type step_config["processing_step"] = step.name match_sphotref = f'sphotref == {db_sphotref.filename.encode("ascii")!r}' if not step_config["lc_points_filter_expression"]: step_config["lc_points_filter_expression"] = match_sphotref else: step_config["lc_points_filter_expression"] = ( f'({step_config["lc_points_filter_expression"]}) and ' + match_sphotref ) for ( option_name, input_master_type, input_master_type_id, ) in db_session.execute( select(InputMasterTypes.config_name, MasterType.name, MasterType.id) .join(MasterType) .join(ImageType, InputMasterTypes.image_type_id == ImageType.id) .where( InputMasterTypes.step_id == step.id, ImageType.name == self._current_image_type, ) ).all(): option_name = option_name.replace("-", "_") if input_master_type == "single_photref": step_config[option_name] = db_sphotref.filename elif input_master_type == "lightcurve_catalog": step_config[option_name] = catalog else: step_config[option_name] = db_session.scalar( select(MasterFile.filename) .join( LightCurveProcessingProgress, MasterFile.progress_id == LightCurveProcessingProgress.id, ) .where( LightCurveProcessingProgress.single_photref_id == db_sphotref.id ) .where(MasterFile.type_id == input_master_type_id) )
[docs] def _start_step( self, *, step, db_sphotref_image, sphotref_header, db_sphotref, db_session, ): """ Record start of processing and return the LCs and configuration to use. Args: step(Step): The database step to start. db_sphotref_image(Image): The Image database object corresponding to the single photometric reference for which processing is starting. sphotref_header(dict-like): The header of the single photometric reference. db_sphotref(MasterFile): The database MasterFile instance corresponding to the single photometric reference. db_session: Session for querying the database. Returns: []: List of the lightcurves to process dict: The complete configuration to use for the specified processing. """ self._create_current_processing( step, ("single_photref", db_sphotref.id), db_session ) catalog, step_config, lc_fname = self.get_step_config( step=step, db_sphotref=db_sphotref, db_sphotref_image=db_sphotref_image, sphotref_header=sphotref_header, db_session=db_session, ) return ( self._get_lc_fnames( step=step, db_sphotref=db_sphotref, catalog_fname=catalog, lc_fname=lc_fname, db_session=db_session, ), step_config, )
[docs] def _check_ready(self, step, image_type, single_photref_fname, db_session): """ Check if the given type of images is ready to process with given step. Args: step(Step): The step to check for readiness. image_type_id(int): The ID of the type of image to check for readiness (the image type of the single photometric reference). single_photref_fname(str): The filename of the single photometric reference identifying light curve points to process. db_session(Session): The database session to use. Returns: bool: Whether all requirements for the specified processing are satisfied. """ for required_step_name, required_imtype_id in db_session.execute( select( Step.name, StepDependencies.blocking_image_type_id, ) .select_from(StepDependencies) .join( Step, StepDependencies.blocking_step_id == Step.id, ) .where(StepDependencies.blocked_step_id == step.id) .where(StepDependencies.blocked_image_type_id == image_type.id) ).all(): assert required_imtype_id == image_type.id if ( required_step_name in self.pending and image_type.name in self.pending[required_step_name] and ( single_photref_fname in self.pending[required_step_name][image_type.name] ) ): self._logger.debug( "Not ready for %s of lightcurve points corresponding to " "single photometric reference %s because %s is pending.", step.name, repr(single_photref_fname), required_step_name, ) return False return True
[docs] def _prepare_processing( self, step_name, single_photref_fname, limit_to_steps ): """Prepare for processing images of given type by a calibration step.""" if limit_to_steps is not None and step_name not in limit_to_steps: self._logger.debug( "Skipping disabled %s for single photometric reference: %s", step_name, repr(single_photref_fname), ) return None, None # pylint: disable=no-member with ( Session.begin() as db_session, DataReductionFile(single_photref_fname, "r") as sphotref_dr, ): # pylint: enable=no-member db_sphotref = db_session.scalar( select(MasterFile).where( MasterFile.filename == single_photref_fname ) ) step = db_session.scalar(select(Step).where(Step.name == step_name)) header = sphotref_dr.get_frame_header() image = db_session.scalar( select(Image).where( Image.raw_fname.contains(header["RAWFNAME"] + ".fits") ) ) self.evaluate_expressions_image(image, db_session) self._current_image_type = image.image_type.name setup_process( task="main", parent_pid="", processing_step=step_name, image_type=self._current_image_type, **self._processing_config, ) if not self._check_ready( step, image.image_type, single_photref_fname, db_session ): return None, None return self._start_step( step=step, db_sphotref_image=image, sphotref_header=header, db_sphotref=db_sphotref, db_session=db_session, )
[docs] def __init__(self, *args, **kwargs): """Initialize self._current_image_type in addition to normali init.""" self._current_image_type = None super().__init__(*args, **kwargs) # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member self.set_pending(db_session)
[docs] @staticmethod def select_step_sphotref(db_session, pending=True, full_objects=False): """Return pening or non-pending step/single photref combinations.""" master_cat_id = db_session.scalar( select(MasterType.id).where(MasterType.name == "lightcurve_catalog") ) create_lc_step_id = db_session.scalar( select(Step.id).where(Step.name == "create_lightcurves") ) return db_session.execute( select( Step if full_objects else Step.name, MasterFile if full_objects else MasterFile.filename, ) .select_from(ProcessingSequence) .join(Step) .join(MasterFile, literal(True)) .join(MasterType) .join(InputMasterTypes, InputMasterTypes.step_id == Step.id) .join(StepDependencies, StepDependencies.blocked_step_id == Step.id) .outerjoin( LightCurveProcessingProgress, and_( (LightCurveProcessingProgress.step_id == Step.id), ( LightCurveProcessingProgress.single_photref_id == MasterFile.id ), LightCurveProcessingProgress.final == True, ), ) .where(StepDependencies.blocking_step_id == create_lc_step_id) .where(MasterType.name == "single_photref") .where(InputMasterTypes.master_type_id == master_cat_id) .where( # pylint: disable=singleton-comparison LightCurveProcessingProgress.final == None if pending else LightCurveProcessingProgress.final == True # pylint: enable=singleton-comparison ) ).all()
[docs] def set_pending(self, db_session): """ Set the unprocessed images and channels split by step and image type. Args: db_session(Session): The database session to use. Returns: {step name: [str, ...]}: The filenames of the single photometric reference DR files for which lightcurves exist but the given steps has not been performend yet. """ pending = self.select_step_sphotref(db_session) print("\n\t" + "\n\t".join([repr(e) for e in pending])) for step, sphotref_fname in pending: # pylint: disable=no-member with DataReductionFile(sphotref_fname, "r") as sphotref_dr: # pylint: enable=no-member image_type_name = db_session.scalar( select(ImageType.name) .select_from(Image) .join(ImageType) .where( Image.raw_fname.contains( sphotref_dr.get_frame_header()["RAWFNAME"] + ".fits" ) ) ) if step not in self.pending: self.pending[step] = {} if image_type_name not in self.pending[step]: self.pending[step][image_type_name] = [] self.pending[step][image_type_name].append(sphotref_fname)
[docs] def get_step_config( self, *, step, db_sphotref, db_sphotref_image, sphotref_header, db_session, ): """Return the configuration to use for the given step/sphotref combo.""" matched_expressions = self._evaluated_expressions[db_sphotref_image.id][ sphotref_header["CLRCHNL"] ]["matched"] create_lc_cofig = self.get_config( matched_expressions, db_session, step_name="create_lightcurves" )[0] catalog = create_lc_cofig["lightcurve_catalog_fname"].format_map( sphotref_header ) assert path.exists(catalog) step_config = self.get_config( matched_expressions, db_session, db_step=step )[0] self._specialize_config( step=step, step_config=step_config, db_sphotref=db_sphotref, catalog=catalog, db_session=db_session, ) return catalog, step_config, create_lc_cofig["lc_fname"]
[docs] def __call__(self, limit_to_steps=None): """Perform all the processing for the given steps (all if None).""" # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member processing_sequence = [ (step.name, imtype.name) for step, imtype in get_processing_sequence(db_session) ] for step_name, imtype_name in processing_sequence: if step_name not in self.pending: continue for single_photref_fname in self.pending[step_name][imtype_name][:]: lc_fnames, configuration = self._prepare_processing( step_name, single_photref_fname, limit_to_steps ) if lc_fnames is None: continue self._logger.debug( "Starting %s on %d lightcurves for single photref %s with " "configuration:\n\t%s", step_name, len(lc_fnames), repr(single_photref_fname), "\n\t".join( f"{key}: {value!r}" for key, value in configuration.items() ), ) step_module = getattr(processing_steps, step_name) new_masters = getattr(step_module, step_name)( lc_fnames, 0, configuration, self._mark_progress ) # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member # False positive # pylint: disable=not-callable self._current_processing = db_session.merge( self._current_processing ) self._current_processing.finished = sql.func.now() self._current_processing.final = True # pylint: enable=not-callable if new_masters: self.add_masters( new_masters, step_name, self._current_image_type ) self.pending[step_name][imtype_name].remove( single_photref_fname )
[docs] def main(): """Avoid global variables.""" logging.basicConfig(level=logging.DEBUG) logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) manager = LightCurveProcessingManager() print("Pending: " + repr(manager.pending)) manager()
if __name__ == "__main__": if os.name == "posix": # Linux/macOS try: setsid() except OSError: print(f"pid={getpid():d} pgid={getpgid(0):d}") pid = fork() if pid < 0: raise RuntimeError("fork fail") if pid != 0: sys.exit(0) setsid() main() # Run main function in child process elif os.name == "nt": # Windows try: subprocess.Popen( [sys.executable, sys.argv[0]] + sys.argv[1:], # Relaunch with same arguments creationflags=DETACHED_PROCESS, ) sys.exit(0) # Exit parent process except Exception as e: sys.stderr.write(f"Failed to detach: {e}\n") sys.exit(1)