Source code for autowisp.database.image_processing

#!/usr/bin/env python3
# pylint: disable=too-many-lines

"""Handle data processing DB interactions."""

import logging
import os

from os import path, getpid

import sys
import subprocess

from sqlalchemy import sql, select, update, and_, or_
from configargparse import ArgumentParser, DefaultsFormatter

from autowisp.multiprocessing_util import (
    setup_process,
    get_log_outerr_filenames,
)
from autowisp.database.processing import ProcessingManager
from autowisp.database.interface import Session
from autowisp.file_utilities import find_fits_fnames
from autowisp import processing_steps
from autowisp.database.user_interface import get_processing_sequence
from autowisp.data_reduction.data_reduction_file import DataReductionFile

# False positive due to unusual importing
# pylint: disable=no-name-in-module
from autowisp.database.data_model import (
    StepDependencies,
    ImageProcessingProgress,
    ProcessedImages,
    Step,
    Image,
    ObservingSession,
    MasterType,
    InputMasterTypes,
    Condition,
    ConditionExpression,
)
from autowisp.database.data_model.provenance import (
    Camera,
    CameraChannel,
    CameraType,
)

# pylint: enable=no-name-in-module


[docs] class NoMasterError(ValueError): """Raised when no suitable master can be found for a batch of frames."""
# Intended to be used as simple callable # pylint: disable=too-few-public-methods
[docs] class ExpressionMatcher: """ Compare condition expressions for an image/channel to a target. Usually check if matched expressions and master expression values are identical, but also handles special case of calibrate step. """
[docs] def _get_master_values(self, image_id, channel): """Return ready to compare masster expression values.""" if channel is None: return tuple( self._get_master_values(image_id, channel) for channel in sorted( filter(None, self._evaluated_expressions[image_id].keys()) ) ) self._logger.debug( "Getting master expression values for expression ids %s, " "image %d, channel %s", repr(self._master_expression_ids), image_id, channel, ) return tuple( self._evaluated_expressions[image_id][channel]["values"][ expression_id ] for expression_id in self._master_expression_ids )
[docs] def __init__( self, evaluated_expressions, ref_image_id, ref_channel, master_expression_ids, *, masters_only=False, ): """ Set up comparison to the given evaluated expressions. """ self._logger = logging.getLogger(__name__) self._evaluated_expressions = evaluated_expressions self._master_expression_ids = master_expression_ids reference_evaluated = evaluated_expressions[ref_image_id][ref_channel] self._ref_matched = reference_evaluated["matched"] self.ref_master_values = self._get_master_values( ref_image_id, ref_channel ) self._masters_only = masters_only self._logger.debug( "Finding images matching expressions %s and values %s", repr(self._ref_matched), repr(self.ref_master_values), )
[docs] def __call__(self, image_id, channel): """True iff the expressions for the given image/channel match.""" image_evaluated = self._evaluated_expressions[image_id][channel] image_master_values = self._get_master_values(image_id, channel) self._logger.debug( "Comparing %s to %s and %s to %s", repr(image_evaluated["matched"]), repr(self._ref_matched), repr(image_master_values), repr(self.ref_master_values), ) return ( self._masters_only or image_evaluated["matched"] == self._ref_matched ) and image_master_values == self.ref_master_values
# pylint: enable=too-few-public-methods
[docs] def get_master_expression_ids(step_id, image_type_id, db_session): """ List all condition expression IDs determining input or output masters. Args: step_id(int): The ID of the step for which to return the master expression IDs. image_type_id(int): The type of images being processed by the step for which to return the master expression IDs. Returns: [int]: The combined expression IDs reqired to determine which required masters can be used for the given step or which masters will be created by it. """ return sorted( set( db_session.scalars( select(ConditionExpression.id) .select_from(InputMasterTypes) .join(MasterType) .join( Condition, # False positive # pylint: disable=no-member MasterType.condition_id == Condition.id, # pylint: enable=no-member ) .join(ConditionExpression) .where(InputMasterTypes.step_id == step_id) .where(InputMasterTypes.image_type_id == image_type_id) .group_by(ConditionExpression.id) ).all() + db_session.scalars( select(ConditionExpression.id) .select_from(MasterType) .join( Condition, or_( # False positive # pylint: disable=no-member MasterType.condition_id == Condition.id, ( MasterType.maker_image_split_condition_id == Condition.id ), # pylint: enable-no-member ), ) .join(ConditionExpression) .where(MasterType.maker_step_id == step_id) .where(MasterType.maker_image_type_id == image_type_id) ).all() ) )
[docs] def remove_failed_prerequisite( pending, pending_image_type_id, prereq_step_id, db_session ): """Remove from pending any entries that failed the prerequisite step.""" prereq_statuses = [ db_session.execute( select(ProcessedImages.status) .outerjoin(ImageProcessingProgress) .where( (ProcessedImages.image_id == image.id), ProcessedImages.channel == channel, ImageProcessingProgress.step_id == prereq_step_id, ( ImageProcessingProgress.image_type_id == pending_image_type_id ), ) ).scalar_one_or_none() for image, channel, _ in pending ] dropped = [] for i in range(len(pending) - 1, -1, -1): if prereq_statuses[i] and prereq_statuses[i] < 0: dropped.append(pending.pop(i)) return dropped
# pylint: disable=too-many-instance-attributes
[docs] class ImageProcessingManager(ProcessingManager): """ Read configuration and record processing progress in the database. Attrs: See `ProcessingManager`. pending(dict): Indexed by step ID, and image type ID list of (Image, channel name, status) tuples listing all the images of the given type that have not been processed by the currently selected version of the step in the key and their status if previous processing by that step was interrupted or None if not. _failed_dependencies(dict): Dictionary with keys (step, image_type) that contains the list of images and channels that failed the given step. """
[docs] def _set_calibration_config(self, config, first_image): """Retrun the specially formatted argument for the calibration step.""" config["split_channels"] = self._get_split_channels(first_image) config["extra_header"] = { "OBS-SESN": first_image.observing_session.label } result = { ( "split_channels", "".join( repr(c) for c in first_image.observing_session.camera.channels ), ), ("observing_session", config["extra_header"]["OBS-SESN"]), } self._logger.debug( "Calibration step configuration:\n%s", "\n\t".join((f"{k}: {v!r}" for k, v in config.items())), ) return result
[docs] def _split_by_master(self, batch, input_master_type): """Split the given list of images by the best master of given type.""" result = {} for image, channel, status in batch: if channel is None: best_master = tuple( ( channel, self._evaluated_expressions[image.id][channel][ "masters" ][input_master_type.master_type.name], ) for channel in sorted( filter( None, self._evaluated_expressions[image.id].keys() ) ) ) else: best_master = self._evaluated_expressions[image.id][channel][ "masters" ][input_master_type.master_type.name] if best_master in result: result[best_master].append((image, channel, status)) else: result[best_master] = [(image, channel, status)] return result
# Could not find good way to simplify # pylint: disable=too-many-locals
[docs] def _get_batch_config( self, batch, master_expression_values, step, db_session ): """ Split given batch of images by configuration for given step. The batch must already be split by all relevant condition expressions. Only splits batches by the best master for each image. Args: batch([Image, channel, status]): List of database image instances and for channels which to find the configuration(s). The channel should be ``None`` for the ``calibrate`` step master_expression_values(tuple): The values the expressions required to select input masters or to guarantee a unique output master. Should be provided in consistent order for all batches processed by the same step. step(Step): The database step instance to configure. db_session: Database session to use for queries. Returns: dict: keys: guaranteed to match iff configuration, output master conditions, and all best input master(s) match. In other words, if this function is called separately on multiple batches, it is safe to combine and process together those that end up with the same key. values: dict: The configuration to use for the given (sub-)batch. [Image]: The (sub-)batch of images to process with given configuration. """ self._logger.debug("Finding configuration for batch: %s", repr(batch)) first_image_expressions = self._evaluated_expressions[batch[0][0].id] config, config_key = self.get_config( first_image_expressions[batch[0][1]]["matched"], db_session, db_step=step, ) config_key |= {master_expression_values} if step.name == "calibrate": config_key |= self._set_calibration_config(config, batch[0][0]) config["processing_step"] = step.name config["image_type"] = batch[0][0].image_type.name result = {config_key: (config, batch)} for input_master_type in db_session.scalars( select(InputMasterTypes).filter_by( step_id=step.id, image_type_id=batch[0][0].image_type_id ) ).all(): for config_key, (config, sub_batch) in list(result.items()): del result[config_key] splits = self._split_by_master(sub_batch, input_master_type) for best_master, sub_batch in splits.items(): if best_master is None: if input_master_type.optional: assert config_key not in result result[config_key] = (config, sub_batch) else: result[None] = ( "No master " + input_master_type.master_type.name + " found!", sub_batch, ) else: new_config = dict(config) new_config[ input_master_type.config_name.replace("-", "_") ] = ( best_master if isinstance(best_master, str) else dict(best_master) ) key_extra = { (input_master_type.config_name, best_master) } result[config_key | key_extra] = (new_config, sub_batch) return result
# pylint: enable=too-many-locals
[docs] def _clean_pending_per_dependencies( self, db_session, from_step_id=None, from_image_type_id=None ): """Remove pending images from steps if they failed a required step.""" dropped = {} for (step_id, image_type_id), pending in self.pending.items(): if ( from_image_type_id is not None and image_type_id != from_image_type_id ): continue for prereq_step_id in db_session.scalars( select(StepDependencies.blocking_step_id).where( StepDependencies.blocked_step_id == step_id, StepDependencies.blocked_image_type_id == image_type_id, StepDependencies.blocking_image_type_id == image_type_id, ) ): if from_step_id is not None and prereq_step_id != from_step_id: continue if (step_id, image_type_id) not in dropped: dropped[(step_id, image_type_id)] = [] failed_prereq = remove_failed_prerequisite( pending, image_type_id, prereq_step_id, db_session ) self.pending[(step_id, image_type_id)] = pending dropped[(step_id, image_type_id)].extend(failed_prereq) self._logger.info( "The following image/channel combinations failed %s. " "Excluding from %s:\n\t%s", db_session.scalar( select(Step.name).filter_by(id=prereq_step_id) ), db_session.scalar(select(Step.name).filter_by(id=step_id)), "\n\t".join( image.raw_fname + ":" + channel for image, channel in failed_prereq ), ) return dropped
[docs] def _check_ready(self, step, image_type, db_session): """ Check if the given type of images is ready to process with given step. Args: step(Step): The step to check for readiness. image_type(ImageType): The type of images to check for readiness. db_session(Session): The database session to use. Returns: bool: Whether all requirements for the specified processing are satisfied. """ for requirement in db_session.execute( select( StepDependencies.blocking_step_id, StepDependencies.blocking_image_type_id, ) .where(StepDependencies.blocked_step_id == step.id) .where(StepDependencies.blocked_image_type_id == image_type.id) ).all(): if self.pending[requirement]: self._logger.debug( "Not ready for %s of %d %s frames because of %d pending %s " "type ID images for step ID %s:\n\t%s", step.name, len(self.pending[(step.id, image_type.id)]), image_type.name, len(self.pending[requirement]), requirement[1], requirement[0], "\n\t".join( f"{e[0]!r}: {e[1]!r}" for e in self.pending[requirement] ), ) return False return True
[docs] def _get_interrupted(self, need_cleanup, db_session): """Return list of interrupted files and configuration for cleanup.""" self.current_step = need_cleanup[0][2] self._current_processing = db_session.scalar( select(ImageProcessingProgress).where( ImageProcessingProgress.id == need_cleanup[0][1].progress_id ) ) input_type = getattr( processing_steps, self.current_step.name ).input_type for entry in need_cleanup: assert entry[2] == self.current_step pending = [ ( image, None if input_type == "raw" else processed.channel, processed.status, ) for image, processed, _ in need_cleanup ] for image, _, __ in need_cleanup: if image.id not in self._evaluated_expressions: self.evaluate_expressions_image(image, db_session) cleanup_batches = self._get_config_batches( pending, input_type, db_session ) result = {} for (config_key, status), (config, batch) in cleanup_batches.items(): if config_key not in result: result[config_key] = (config, []) result[config_key][1].extend([(fname, status) for fname in batch]) return list(result.values())
[docs] def _cleanup_interrupted(self, db_session): """Cleanup previously interrupted processing for the current step.""" need_cleanup = db_session.execute( select(Image, ProcessedImages, Step) .join(ProcessedImages) .join(ImageProcessingProgress) .join(Step) .where(~ProcessedImages.final) .order_by(Step.name) ).all() if not need_cleanup: return step_module = getattr(processing_steps, need_cleanup[0][2].name) for config, interrupted in self._get_interrupted( need_cleanup, db_session ): self._logger.warning( "Cleaning up interrupted %s processing of %d images:\n" "%s\n" "config: %s", need_cleanup[0][2], len(interrupted), repr(interrupted), repr(config), ) new_status = step_module.cleanup_interrupted(interrupted, config) for _, processed, _ in need_cleanup: assert new_status >= -1 assert new_status <= processed.status if new_status == -1: db_session.delete(processed) else: processed.status = new_status
[docs] def _init_processed_ids(self, image, channels, step_input_type): """Prepare to record processing of the given image by current step.""" if channels == [None]: channels = self._evaluated_expressions[image.id].keys() for channel_name in channels: if channel_name is None: continue step_input_fname = self.get_step_input( image, channel_name, step_input_type ) if step_input_fname not in self._processed_ids: self._processed_ids[step_input_fname] = [] self._processed_ids[step_input_fname].append( {"image_id": image.id, "channel": channel_name} )
[docs] def _start_step(self, step, image_type, db_session): """ Record the start of a processing step and return the images to process. Args: step(Step): The database step to start. image_type(ImageType): The database type of image to start processing. db_session: Active session for database queries. Returns: [(Image, str)]: The list of images and channels to process. str: The type of input expected by the current step. """ self._create_current_processing( step, ("image_type", image_type.id), db_session ) pending_images = self.pending[(step.id, image_type.id)].copy() for image, channel, status in self._failed_dependencies.get( (step.id, image_type.id), [] ): self._logger.info( "Prerequisite failed for %s of %s", step.name, image ) db_session.add( ProcessedImages( image_id=image.id, channel=channel, progress_id=self._current_processing.id, status=-1, final=True, ) ) self._some_failed = True self._processed_ids = {} step_input_type = getattr(processing_steps, step.name).input_type if step_input_type == "raw": added = set() new_pending = [] for image, _, status in pending_images: if image.id not in added: added.add(image.id) new_pending.append((image, None, status)) pending_images = new_pending for image, channel_name, _ in pending_images: self.evaluate_expressions_image(image, db_session) self._init_processed_ids(image, [channel_name], step_input_type) self._logger.info( "Starting %s step for %d %s images", self.current_step.name, len(pending_images), image_type.name, ) return pending_images, step_input_type
[docs] def _process_batch( self, batch, *, start_status, config, step_name, image_type_name ): """Run the current step for a batch of images given configuration.""" step_module = getattr(processing_steps, step_name) new_masters = getattr(step_module, step_name)( batch, start_status, config, self._start_processing, self._end_processing, ) if new_masters: self.add_masters(new_masters, step_name, image_type_name)
[docs] def _start_processing(self, input_fname, status=0): """ Mark in the database that processing the given file has begun. Args: input_fname: The filename of the input (DR or FITS) that is about to begin processing. Returns: None """ assert self.current_step is not None assert self._current_processing is not None self._logger.debug( "Starting processing IDs: %s", repr(self._processed_ids[input_fname]), ) # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member for starting_id in self._processed_ids[input_fname]: db_session.add( ProcessedImages( **starting_id, progress_id=self._current_processing.id, status=status, final=False, ) )
[docs] def _end_processing(self, input_fname, status=1, final=True): """ Record that the current step has finished processing the given file. Args: input_fname: The filename of the input (DR or FITS) that was processed. Returns: None """ assert self.current_step is not None assert self._current_processing is not None assert status != -1 if status < 0: self._some_failed = True self._logger.debug( "Finished processing %s", repr(self._processed_ids[input_fname]) ) # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable-no-member for finished_id in self._processed_ids[input_fname]: db_session.execute( update(ProcessedImages) .where(ProcessedImages.image_id == finished_id["image_id"]) .where(ProcessedImages.channel == finished_id["channel"]) .where( ProcessedImages.progress_id == self._current_processing.id ) .values(status=status, final=final) )
# No good way to simplify # pylint: disable=too-many-locals
[docs] def _get_config_batches(self, pending_images, step_input_type, db_session): """Return the batches of images to process with identical config.""" result = {} check_image_type_id = pending_images[0][0].image_type_id for ( by_condition, master_expression_values, ) in self.group_pending_by_conditions( pending_images, db_session, match_observing_session=self.current_step.name == "calibrate", ): for config_key, (config, batch) in self._get_batch_config( by_condition, master_expression_values, self.current_step, db_session, ).items(): if config_key is None: self._logger.warning( "Excluding the following images from %s:\n\t%s", config, "\n\t".join( [ self.get_step_input( image, channel, step_input_type ) for image, channel, _ in batch ] ), ) continue for image, channel, status in batch: assert image.image_type_id == check_image_type_id if (config_key, status) not in result: result[config_key, status] = (config, []) result[config_key, status][1].append( self.get_step_input(image, channel, step_input_type) ) return result
# pylint: enable=too-many-locals
[docs] def _prepare_processing(self, step, image_type, limit_to_steps): """Prepare for processing images of given type by a calibration step.""" # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member setup_process( task="main", parent_pid="", processing_step=step.name, image_type=image_type.name, **self._processing_config, ) step = db_session.merge(step) image_type = db_session.merge(image_type) self.set_pending(db_session, [(step, image_type)]) if limit_to_steps is not None and step.name not in limit_to_steps: self._logger.debug( "Skipping disabled %s for %s frames", step.name, image_type.name, ) return step.name, image_type.name, None if not self._check_ready(step, image_type, db_session): return step.name, image_type.name, None pending_images, step_input_type = self._start_step( step, image_type, db_session ) if not pending_images: return step.name, image_type.name, None return ( step.name, image_type.name, self._get_config_batches( pending_images, step_input_type, db_session ), )
[docs] def _finalize_processing(self): """Update database and instance after processing.""" # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member self._current_processing = db_session.merge( self._current_processing ) self._current_processing.finished = ( # False positive # pylint: disable=not-callable sql.func.now() # pylint: enable=not-callable ) pending = self.pending[ ( self._current_processing.step_id, self._current_processing.image_type_id, ) ] self._logger.info( "Removing from pending all successful images for " "progress: %s", self._current_processing, ) for finished_image_id, finished_channel in db_session.execute( select(ProcessedImages.image_id, ProcessedImages.channel) .where( ProcessedImages.progress_id == self._current_processing.id ) .where( # pylint: disable=singleton-comparison ProcessedImages.final == True # pylint: enable=singleton-comparison ) .where( or_(ProcessedImages.status > 0, ProcessedImages.status < -1) ) ).all(): found = False for i, (image, channel, _) in enumerate(pending): if ( image.id == finished_image_id and channel == finished_channel ): assert not found del pending[i] found = True break if not found: self._logger.error( "Completed image ID %d, channel %s not found in " "pending for step ID %d, image type ID %d:\n\t%s", finished_image_id, finished_channel, self._current_processing.step_id, self._current_processing.image_type_id, "\n\t".join(f"{e[0]!r}: {e[1]!r}" for e in pending), ) raise RuntimeError("Finished non-pending image!") self.pending[ ( self._current_processing.step_id, self._current_processing.image_type_id, ) ] = pending
# if self._some_failed: # dropped = self._clean_pending_per_dependencies( # db_session, # self._current_processing.step_id, # self._current_processing.image_type_id # ) # for step_imtype, dropped_images in dropped.items(): # if step_imtype in self._failed_dependencies: # self._failed_dependencies[ # step_imtype # ].extend( # dropped_images # ) # else: # self._failed_dependencies[step_imtype] = ( # dropped_images # )
[docs] def __init__(self, *args, **kwargs): """Initialize self._failed_dependencies in addition to normali init.""" self._failed_dependencies = {} super().__init__(*args, **kwargs)
[docs] def get_step_input(self, image, channel_name, step_input_type): """Return the name of the file required by the current step.""" if step_input_type == "raw": return image.raw_fname if step_input_type.startswith("calibrated"): return self._evaluated_expressions[image.id][channel_name][ "calibrated" ] if step_input_type == "dr": return self._evaluated_expressions[image.id][channel_name]["dr"] raise ValueError(f"Invalid step input type {step_input_type}")
[docs] def set_pending(self, db_session, steps_imtypes=None, invert=False): """ Set the unprocessed images and channels split by step and image type. Set the self.pending attribute to a dictionary with format ``{(step.id, image_type.id): (Image, str)}``, containing the images and channels of the specified type for which the specified step has not applied with the current configuration. Args: db_session(Session): The database session to use. steps_imtypes(Step, ImageType): The step image type combinations to determine pending images for. If unspecified, the full processing sequence defined in the database is used. invert(bool): If True, returns successfully completed (not failed) instead of pending. Returns: None """ status_select = ( select( ProcessedImages.image_id, ProcessedImages.channel, sql.func.max(ProcessedImages.status).label("status"), ) .join(ImageProcessingProgress) .where(ProcessedImages.status > 0) .where(ProcessedImages.final == 0) .group_by(ProcessedImages.image_id, ProcessedImages.channel) ) for step, image_type in steps_imtypes or get_processing_sequence( db_session ): failed_prereq_subquery = ( select(ProcessedImages.image_id, ProcessedImages.channel) .select_from(StepDependencies) .join( ImageProcessingProgress, and_( StepDependencies.blocking_step_id == ImageProcessingProgress.step_id, StepDependencies.blocking_image_type_id == ImageProcessingProgress.image_type_id, ), ) .join(ProcessedImages) .where(StepDependencies.blocked_step_id == step.id) .where(StepDependencies.blocked_image_type_id == image_type.id) .where(ProcessedImages.status < 0) .group_by(ProcessedImages.image_id, ProcessedImages.channel) .subquery() ) processed_subquery = ( select(ProcessedImages.image_id, ProcessedImages.channel) .join(ImageProcessingProgress) .where(ImageProcessingProgress.step_id == step.id) .where(ImageProcessingProgress.image_type_id == image_type.id) .where( ImageProcessingProgress.configuration_version == self.step_version[step.name] ) .where(ProcessedImages.final) ) status_subquery = ( status_select.where(ImageProcessingProgress.step_id == step.id) .where(ImageProcessingProgress.image_type_id == image_type.id) .where( ImageProcessingProgress.configuration_version == self.step_version[step.name] ) .subquery() ) if invert: processed_subquery = processed_subquery.where( ProcessedImages.status > 0 ) processed_subquery = processed_subquery.subquery() query = ( select(Image, CameraChannel.name, status_subquery.c.status) .join( ObservingSession, ) .join(Camera) .join(CameraType) .join(CameraChannel) .outerjoin( processed_subquery, # False positive # pylint: disable=no-member and_( Image.id == processed_subquery.c.image_id, CameraChannel.name == processed_subquery.c.channel, ), # pylint: enable=no-member ) .outerjoin( failed_prereq_subquery, and_( Image.id == failed_prereq_subquery.c.image_id, CameraChannel.name == failed_prereq_subquery.c.channel, ), ) .outerjoin( status_subquery, and_( Image.id == status_subquery.c.image_id, CameraChannel.name == status_subquery.c.channel, ), ) .where( # False positive # pylint: disable=no-member Image.image_type_id == image_type.id # pylint: enable=no-member ) ) # This is how NULL comparison is done in SQLAlchemy # pylint: disable=singleton-comparison if invert: query = query.where(processed_subquery.c.image_id != None) else: query = query.where(processed_subquery.c.image_id == None) self.pending[(step.id, image_type.id)] = db_session.execute( query.where(failed_prereq_subquery.c.image_id == None) ).all() self._failed_dependencies[(step.id, image_type.id)] = ( db_session.execute( query.where(failed_prereq_subquery.c.image_id != None) ).all() ) # pylint: enable=singleton-comparison self._logger.debug( "%s is pending for %d and failed dependencies for %d %s images", step.name, len(self.pending[(step.id, image_type.id)]), len(self._failed_dependencies[(step.id, image_type.id)]), image_type.name, ) self._logger.debug("Pending: %s", repr(self.pending))
[docs] def group_pending_by_conditions( self, pending_images, db_session, *, match_observing_session=False, step_id=None, masters_only=False, ): """ Group pendig_images by condition expression values. Args: pending_images([Image, str]): A list of the images (instance of Image DB class) and channels to group. db_session: Database session to use for querries. match_observing_session: Whether each group of images needs to be from the same observing session. step_id(int): The ID of the step for which to group the pending images. If not specified, defaults to the current step. masters_only: If True, grouping is done only by the values expressions required to determine the input or output masters for the current step. Returns: [([Image, str], tuple)]: Each entry is contains a list of the image/channel combinations matching a unique set of conditions and the second entry is the master expression values for all images in the list. """ image_type_id = pending_images[0][0].image_type_id result = [] master_expression_ids = get_master_expression_ids( step_id or self.current_step.id, image_type_id, db_session ) while pending_images: self._logger.debug( "Finding images matching the same expressions as image id %d, " "channel %s", pending_images[-1][0].id, pending_images[-1][1], ) batch = [] match_expressions = ExpressionMatcher( self._evaluated_expressions, pending_images[-1][0].id, pending_images[-1][1], master_expression_ids, masters_only=masters_only, ) observing_session_id = pending_images[-1][0].observing_session_id for i in range(len(pending_images) - 1, -1, -1): if ( not match_observing_session or pending_images[i][0].observing_session_id == observing_session_id ) and match_expressions( pending_images[i][0].id, pending_images[i][1] ): batch.append(pending_images.pop(i)) else: self._logger.debug("Not a match") self._logger.debug( "Image batch:\n\t%s", "\n\t".join( f"{image.raw_fname}: {channel} status {status}" for image, channel, status in batch ), ) result.append((batch, match_expressions.ref_master_values)) return result
[docs] def find_processing_outputs(self, processing_progress, db_session=None): """Return all logging and output filenames for given processing ID.""" if db_session is None: # False positivie # pylint: disable=no-member # pylint: disable=redefined-argument-from-local with Session.begin() as db_session: # pylint: enable=no-member # pylint: enable=redefined-argument-from-local return self.find_processing_outputs( processing_progress, db_session ) if not isinstance(processing_progress, ImageProcessingProgress): return self.find_processing_outputs( db_session.scalar( select(ImageProcessingProgress).filter_by( id=processing_progress ) ), db_session, ) main_fnames = get_log_outerr_filenames( existing_pid=processing_progress.process_id, task="*", parent_pid="", processing_step=processing_progress.step.name, image_type=processing_progress.image_type.name, **self._processing_config, ) logging.info("Main fnames: %s", repr(main_fnames)) assert len(main_fnames[0]) == len(main_fnames[1]) == 1 return ( tuple(fname[0] for fname in main_fnames), get_log_outerr_filenames( existing_pid="*", task="*", parent_pid=processing_progress.process_id, processing_step=processing_progress.step.name, image_type=processing_progress.image_type.name, **self._processing_config, ), )
[docs] def __call__(self, limit_to_steps=None): """Perform all the processing for the given steps (all if None).""" # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member processing_sequence = get_processing_sequence(db_session) DataReductionFile.get_file_structure() for step, image_type in processing_sequence: (step_name, image_type_name, processing_batches) = ( self._prepare_processing(step, image_type, limit_to_steps) ) self._logger.debug( "At start of %s step for %s images, pending:\n\t%s", step_name, image_type_name, "\n\t".join( f"{key!r}: {len(val)}" for key, val in self.pending.items() ), ) if processing_batches is None: continue self._finalize_processing() for (_, start_status), ( config, batch, ) in processing_batches.items(): # False positivie # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member self._create_current_processing( step, ("image_type", image_type.id), db_session ) self._logger.debug( "Starting %s for a batch of %d %s images from status %s " "with config:\n%s", step_name, len(batch), image_type_name, start_status, repr(config), ) self._process_batch( batch, start_status=start_status, config=config, step_name=step_name, image_type_name=image_type_name, ) self._logger.debug( "Processed %s batch of %d images.", step_name, len(batch) ) self._finalize_processing() self._logger.debug( "After processing batch, pending:\n\t%s", "\n\t".join( f"{key!r}: {len(val)}" for key, val in self.pending.items() ), ) self._some_failed = False
[docs] def add_raw_images(self, image_collection): """Add the given RAW images to the database for processing.""" # pylint: disable=no-member with Session.begin() as db_session: # pylint: enable=no-member default_expression_id = db_session.scalar( select(ConditionExpression.id).where( ConditionExpression.notes == "Default expression" ) ) configuration = self.get_config( {default_expression_id}, db_session, step_name="add_images_to_db", )[0] processing_steps.add_images_to_db.add_images_to_db( image_collection, configuration )
# pylint: enable=too-many-instance-attributes
[docs] def parse_command_line(): """Return the command line configuration.""" parser = ArgumentParser( description="Manually invoke the fully automated processing", default_config_files=[], formatter_class=DefaultsFormatter, ignore_unknown_config_file_keys=False, ) parser.add_argument( "--add-raw-images", "-i", nargs="+", default=[], help="Before processing add new raw images for processing. Can be " "specified as a combination of image files and directories which will" "be searched for FITS files.", ) parser.add_argument( "--steps", nargs="+", default=None, help="Process using only the specified steps. Leave empty for full " "processing.", ) parser.add_argument( "--detached", action="store_true", help="Indicates that the script is running as a detached process.", ) logging.info("Parsed arguments: %s", parser.parse_args()) return parser.parse_args()
[docs] def main(config): """Avoid global variables.""" logging.basicConfig(level=logging.DEBUG) logging.getLogger("sqlalchemy.engine").setLevel(logging.INFO) logging.debug("Config add_raw_images: %s", config.add_raw_images) logging.debug("Config steps: %s", config.steps) processing = ImageProcessingManager() for img_to_add in config.add_raw_images: logging.debug("Adding raw images from: %s", img_to_add) processing.add_raw_images(find_fits_fnames(path.abspath(img_to_add))) logging.debug("Starting processing...") processing(limit_to_steps=config.steps) logging.debug("Processing completed.")
if __name__ == "__main__": if os.name == "posix": # Linux/macOS from os import getpgid, setsid, fork try: setsid() except OSError: print(f"pid={getpid():d} pgid={getpgid(0):d}") pid = fork() if pid < 0: raise RuntimeError("fork fail") if pid != 0: sys.exit(0) setsid() main(parse_command_line()) # Run main function in child process elif os.name == "nt": # Windows from subprocess import DETACHED_PROCESS if "--detached" not in sys.argv: try: with open("detached_process.log", "w") as log_file: subprocess.Popen( [ sys.executable, os.path.abspath(sys.argv[0]), "--detached", ] + sys.argv[1:], # Relaunch with --detached creationflags=DETACHED_PROCESS, stdout=log_file, stderr=log_file, ) sys.exit(0) # Exit parent process except Exception as e: sys.stderr.write(f"Failed to detach: {e}\n") sys.exit(1) else: try: main(parse_command_line()) except Exception as e: with open("detached_process_error.log", "w") as error_log: error_log.write(f"Error in main: {e}\n")