"""Define base class for processing images or lightcurves."""
from abc import ABC, abstractmethod
import logging
import os
from os import path, getpid
from tempfile import NamedTemporaryFile, TemporaryDirectory
from socket import getfqdn
from psutil import pid_exists, Process
from sqlalchemy import sql, select
from numpy import inf as infinity
from autowisp.multiprocessing_util import setup_process
from autowisp.database.interface import Session
from autowisp import Evaluator
from autowisp.fits_utilities import get_primary_header
from autowisp.image_calibration.fits_util import (
add_required_keywords,
add_channel_keywords,
)
from autowisp.database.user_interface import get_db_configuration
from autowisp.data_reduction.data_reduction_file import DataReductionFile
from autowisp.light_curves.light_curve_file import LightCurveFile
from autowisp import processing_steps
# False positive due to unusual importing
# pylint: disable=no-name-in-module
from autowisp.database.data_model import (
Condition,
ConditionExpression,
Configuration,
ImageType,
ImageProcessingProgress,
InputMasterTypes,
LightCurveProcessingProgress,
MasterFile,
MasterType,
Step,
)
# pylint: enable=no-name-in-module
[docs]
class ProcessingInProgress(Exception):
"""Raised when a particular step is running in a different process/host."""
def __init__(self, processing):
self.step = processing.step.name
if hasattr(processing, "image_type"):
self.target = processing.image_type.name + " images"
else:
self.target = processing.sphotref.filename + " lightcurves"
self.host = processing.host
self.process_id = processing.process_id
def __str__(self):
return (
f"Processing of {self.target} by {self.step} step on "
f"{self.host!r} is still running with process id "
f"{self.process_id!r}!"
)
# pylint: disable=too-many-instance-attributes
[docs]
class ProcessingManager(ABC):
"""
Utilities for automated processing of images or lightcurves.
Attrs:
configuration(dict): Indexed by parameter name with values further
dictionaries with keys:
``version``: the actual version used including fallback
``value``: dict indexed by frozenset of expression IDs that an
image must satisfy for the parameter to have a given value.
condition_expressions({int: str}): Dictionary of condition
expressions that must be evaluated against the header of each input
images to determine the exact values of the configuration parameters
applicable to a given image. Keys are the condition expression IDs
from the database and values are the actual expressions.
step_version(dict): Indexed by step name of the largest value of the
actual version used for any parameter required by that step.
current_step(Step): The currently active step.
_current_processing(ImageProcessingProgress): The currently active
step (the processing progress initiated the last time `start_step()`
was called).
_processed_ids(dict): The keys are the filenames of the required
inputs (DR or FITS) for the current step and the values are
dictionaries with keys ``'image_id'`` and ``'channel'`` identifying
what was processed.
_evaluated_expressions(dict): Indexed by image ID and then channel,
dictionary containing dictionary with keys:
* values: the values of the condition expressions for
the given image and channel indexed by their expression IDs.
* matched: A set of the expression IDs for which the
corresponding expression converts to boolean True.
* calibrated: the filename of the calibrated image
* dr: the filename of the data reduction file
* masters: A dictionary indexed by master type name of the best
master of the given type to apply to the image
An additional entry with channel=None is included which contains
just the common (intersection) set of expressions satisfied for all
channels.
_master_expressions(dict): Indexed by master type, then tuple of
expression values ordered by expression ID of the masters of the
given type that match the given expression values.
pending(dict): Information about what images or lightcurves still
need processing by the various steps. The format is different for
image vs lightcurve processing managers.
"""
[docs]
def get_param_values(
self, matched_expressions, parameters=None, db_session=None
):
"""
Return the values to use for the given parameters.
Args:
matched_expressions(set): Set of expression IDs that the image we
are getting configuration for matches.
parameters([] or str): List of parameter names, or a step, or
its name to get configuration for. Defaults to current step if
not specified.
as_args(bool): If True, return a list of arguments ready to pass
directly to one of the command line parser of the processing
steps.
db_session: Session to use for DB queries. Only needed if
specifying parameters by step name or using default.
Returns:
dict or list: The values for the given parameters indexed by
parameter name.
"""
def get_value(param):
"""Return value for given parameter."""
for required_expressions, value in self.configuration[param][
"value"
].items():
if required_expressions <= matched_expressions:
return value
raise ValueError(f"No viable configuration found for {param}")
if parameters is None:
parameters = self.current_step
if isinstance(parameters, str):
parameters = [
param.name
for param in db_session.scalar(
select(Step).filter_by(name=parameters)
).parameters
]
elif isinstance(parameters, Step):
parameters = [param.name for param in parameters.parameters]
return {param: get_value(param) for param in parameters}
[docs]
def _write_config_file(
self,
matched_expressions,
outf,
db_session,
*,
db_steps=None,
step_names=None,
):
"""
Write to given file configuration for given matched expressions.
Returns:
Set of tuples of parameters and values as set in the file. Used for
comparing configurations.
"""
# TODO: exclude master options
if db_steps is None:
if step_names is None:
steps = db_session.scalars(select(Step).order_by(Step.id)).all()
else:
steps = [
db_session.execute(
select(Step).filter_by(name=name)
).scalar_one()
for name in step_names
]
return self._write_config_file(
matched_expressions, outf, db_steps=steps, db_session=db_session
)
result = set()
for step in db_steps:
self._logger.debug("Getting configuration for %s step", step.name)
outf.write(f"[{step.name}]\n")
step_config = self.get_param_values(
matched_expressions, [param.name for param in step.parameters]
)
self._logger.debug(
"Adding configuration for %s step to config file", step.name
)
for param, value in step_config.items():
if value is not None:
outf.write(f" {param} = {value!r}\n")
self._logger.debug(" %s = %s", param, repr(value))
result.add((param, value))
self._logger.debug(" %s in result", param)
outf.write("\n")
return frozenset(result)
[docs]
def _get_best_master(self, candidate_masters, image_eval):
"""Find the best master from given list for given image/channel."""
self._logger.debug(
"Selecting best master for %s, channel %s from %s",
repr(image_eval("RAWFNAME")),
repr(image_eval("CLRCHNL")),
repr(candidate_masters),
)
if not candidate_masters:
return None
best_master_value = infinity
best_master_fname = None
for master_fname, use_smallest in candidate_masters:
assert use_smallest is not None
master_value = image_eval(use_smallest)
if master_value < best_master_value:
best_master_value = master_value
best_master_fname = master_fname
assert best_master_fname
return best_master_fname
[docs]
def _get_master(self, master_type, image_values, image_eval, db_session):
"""Return the master that should be used for the given image."""
expressions = db_session.execute(
select(ConditionExpression.id, ConditionExpression.expression)
.join_from(
Condition,
ConditionExpression,
Condition.expression_id == ConditionExpression.id,
)
.where(Condition.id == master_type.condition_id)
.order_by(ConditionExpression.id)
).all()
if master_type.name not in self._master_expressions:
self._logger.debug(
"Evaluating expressions for all masters of type: %s",
repr(master_type.name),
)
self._master_expressions[master_type.name] = {}
all_masters = db_session.execute(
select(MasterFile.filename, MasterFile.use_smallest).filter_by(
type_id=master_type.id,
enabled=True,
)
).all()
for master in all_masters:
master_eval = Evaluator(master.filename)
expr_values = tuple(
master_eval(expr) for _, expr in expressions
)
if (
expr_values
not in self._master_expressions[master_type.name]
):
self._master_expressions[master_type.name][expr_values] = []
self._master_expressions[master_type.name][expr_values].append(
master
)
expr_values = tuple(image_values[expr_id] for expr_id, _ in expressions)
self._logger.debug(
"Master %s available for: %s",
master_type.name,
repr(self._master_expressions[master_type.name].keys()),
)
candidates = self._master_expressions[master_type.name].get(
expr_values, []
)
self._logger.debug("Candidate Masters: %s", repr(candidates))
if len(candidates) == 1:
return candidates[0].filename
return self._get_best_master(candidates, image_eval)
[docs]
def _get_evaluated_entry(
self, evaluate, image_type_id, calib_config, db_session
):
"""Return entry to add to self._evaluated_expressions."""
evaluated_expressions = {
"values": {
expr_id: evaluate(expression)
for expr_id, expression in self.condition_expressions.items()
},
"calibrated": calib_config["calibrated_fname"].format_map(
evaluate.symtable
),
"masters": {},
}
evaluated_expressions["matched"] = set(
expr_id
for expr_id, value in evaluated_expressions["values"].items()
if value
)
for required_expressions, value in self.configuration[
"data-reduction-fname"
]["value"].items():
if required_expressions <= evaluated_expressions["matched"]:
evaluated_expressions["dr"] = value.format_map(
evaluate.symtable
)
break
assert "dr" in evaluated_expressions
for master_type in db_session.scalars(
select(MasterType)
.join(InputMasterTypes)
.where(InputMasterTypes.image_type_id == image_type_id)
.distinct()
):
if master_type.name not in ["epd_stat", "tfa_stat"]:
evaluated_expressions["masters"][master_type.name] = (
self._get_master(
master_type,
evaluated_expressions["values"],
evaluate,
db_session,
)
)
return evaluated_expressions
[docs]
def get_matched_expressions(self, evaluate):
"""Return set of matching expressions given an evaluator for image."""
def check(expr):
"""Return True if expression evaluates True."""
try:
return evaluate(expr)
except NameError:
return False
return set(
expr_id
for expr_id, expression in self.condition_expressions.items()
if check(expression)
)
[docs]
def evaluate_expressions_image(self, image, db_session):
"""
Return evaluator for header expressions for given image.
Args:
image(Image): Instance of database Image for which to evaluate
the condition expressions. The image header is augmented by
``IMAGE_TYPE`` keyword set to the name of the image type of the
given image.
db_session: Used to select the best master.
return_evaluator(bool): Should an evaluator setup per the image
header be returned for further use?
Returns:
Evaluator or None:
Evaluator ready to evaluate additional expressions involving
FITS headers. Only returned if ``return_evaluator`` is True.
"""
if image.id in self._evaluated_expressions:
return
self._logger.debug("Evaluating expressions for: %s", repr(image))
evaluate = Evaluator(get_primary_header(image.raw_fname, True))
evaluate.symtable.update(
IMAGE_TYPE=image.image_type.name,
OBS_SESN=image.observing_session.label,
)
self._logger.debug(
"Matched expressions: %s",
repr(self.get_matched_expressions(evaluate)),
)
self._evaluated_expressions[image.id] = {}
all_channel = {"matched": None, "values": None}
for channel_name, channel_slice in self._get_split_channels(
image
).items():
self._logger.debug(
"Adding channel keywords for channel %s of %s",
channel_name,
image.raw_fname,
)
add_channel_keywords(evaluate.symtable, channel_name, channel_slice)
self._logger.debug(
"Configuring for channel %s (%s) of %s",
channel_name,
evaluate.symtable["CLRCHNL"],
image.raw_fname,
)
calib_config = self.get_config(
self.get_matched_expressions(evaluate),
db_session,
step_name="calibrate",
)[0]
self._logger.debug(
"Raw HDU for channel %s (%s) of %s: %s",
channel_name,
evaluate.symtable["CLRCHNL"],
image.raw_fname,
repr(calib_config["raw_hdu"]),
)
add_required_keywords(evaluate.symtable, calib_config, True)
evaluated_expressions = self._get_evaluated_entry(
evaluate, image.image_type_id, calib_config, db_session
)
if all_channel["matched"] is None:
all_channel["matched"] = evaluated_expressions["matched"]
all_channel["values"] = dict(evaluated_expressions["values"])
else:
all_channel["matched"] = (
all_channel["matched"] & evaluated_expressions["matched"]
)
# False positive
# pylint: disable=unsubscriptable-object
# pylint: disable=unsupported-delete-operation
for expr_id in list(all_channel["values"].keys()):
if (
all_channel["values"][expr_id]
!= evaluated_expressions["values"][expr_id]
):
del all_channel["values"][expr_id]
# pylint: enable=unsupported-delete-operation
# pylint: enable=unsubscriptable-object
self._evaluated_expressions[image.id][
channel_name
] = evaluated_expressions
self._evaluated_expressions[image.id][None] = {
"matched": all_channel["matched"],
"values": all_channel["values"],
}
self._logger.debug(
"Evaluated expressions for image %s: %s",
image,
repr(self._evaluated_expressions[image.id]),
)
[docs]
def get_product_fname(self, image_id, channel, product):
"""
Return the ``dr`` or ``calibrated`` filename of specified image/channel.
`self.evaluate_image_expressions()` must already have been called for
this image.
"""
return self._evaluated_expressions[image_id][channel][product]
[docs]
def get_master_fname(self, image_id, channel, master_type_name):
"""Return the filename of best master for a given image/channel."""
return self._evaluated_expressions[image_id][channel]["masters"][
master_type_name
]
[docs]
def _check_running_processing(
self, running_processing, this_host, db_session
):
"""Check if any unfinished processing progresses are still running."""
for processing in running_processing:
if processing is not None and not processing.finished:
if processing.host != this_host or (
pid_exists(processing.process_id)
and path.basename(
Process(processing.process_id).cmdline()[1]
)
== "processing.py"
):
raise ProcessingInProgress(processing)
self._logger.warning(
"Processing progress %s appears to have crashed.",
processing,
)
# False positive
# pylint: disable=not-callable
processing.finished = sql.func.now()
# pylint: enable=not-callable
db_session.flush()
[docs]
def _create_current_processing(self, step, target, db_session):
"""Add a new ProcessingProgress at start of given step."""
this_host = getfqdn()
process_id = getpid()
self.current_step = step
progress_class = (
ImageProcessingProgress
if target[0] == "image_type"
else LightCurveProcessingProgress
)
self._check_running_processing(
db_session.scalars(
select(progress_class).where(
(progress_class.step_id == self.current_step.id),
(getattr(progress_class, target[0] + "_id") == target[1]),
(
progress_class.configuration_version
== self.step_version[step.name]
),
)
).all(),
this_host,
db_session,
)
self._current_processing = progress_class(
step_id=step.id,
**{target[0] + "_id": target[1]},
configuration_version=self.step_version[step.name],
host=this_host,
process_id=process_id,
# False positive
# pylint: disable=not-callable
started=sql.func.now(),
# pylint: enable=not-callable
finished=None,
)
db_session.add(self._current_processing)
db_session.flush()
[docs]
@abstractmethod
def _cleanup_interrupted(self, db_session):
"""Cleanup previously interrupted processing for the current step."""
[docs]
def _get_split_channels(self, image):
"""Return the ``split_channels`` option for the given image."""
return {
channel.name: (
slice(channel.y_offset, None, channel.y_step),
slice(channel.x_offset, None, channel.x_step),
)
for channel in image.observing_session.camera.channels
}
[docs]
def __init__(self, version=None, dummy=False):
"""
Set the public class attributes per the given configuartion version.
Args:
version(int): The version of the parameters to get. If a
parameter value is not specified for this exact version use the
value with the largest version not exceeding ``version``. By
default us the latest configuration version in the database.
dummy(bool): If set to true, all logging is suppressed and no
processing can be performed. Useful for reviewing the results
of past processing.
Returns:
None
"""
if dummy:
logging.disable()
DataReductionFile.get_file_structure()
LightCurveFile.get_file_structure()
self._logger = logging.getLogger(__name__)
self.current_step = None
self._current_processing = None
self.configuration = {}
self.condition_expressions = {}
self._evaluated_expressions = {}
self._master_expressions = {}
self._processed_ids = {}
self.pending = {}
self._some_failed = False
# False positivie
# pylint: disable=no-member
with Session.begin() as db_session:
# pylint: enable=no-member
if version is None:
version = db_session.execute(
# False positivie
# pylint: disable=not-callable
# pylint: disable=no-member
select(sql.func.max(Configuration.version))
# pylint: enable=not-callable
# pylint: enable=no-member
).scalar_one()
db_configuration = get_db_configuration(version, db_session)
for config_entry in db_configuration:
if config_entry.parameter.name not in self.configuration:
self.configuration[config_entry.parameter.name] = {
"version": config_entry.version,
"value": {},
}
self.configuration[config_entry.parameter.name]["value"][
frozenset(
cond.expression_id for cond in config_entry.conditions
)
] = config_entry.value
for cond in config_entry.conditions:
if cond.expression_id not in self.condition_expressions:
self.condition_expressions[cond.expression_id] = (
cond.expression.expression
)
self._processing_config = self.get_config(
self.get_matched_expressions(Evaluator()),
db_session,
step_name="add_images_to_db",
)[0]
del self._processing_config["processing_step"]
del self._processing_config["image_type"]
if not dummy:
setup_process(
task="main",
parent_pid="",
processing_step="init_processing",
image_type="none",
**self._processing_config,
)
for master_type in db_session.scalars(select(MasterType)).all():
for expression in (
master_type.match_expressions
+ master_type.split_expressions
):
self.condition_expressions[expression.id] = (
expression.expression
)
self._logger.debug(
"Condition expressions to evaluate: %s",
repr(self.condition_expressions),
)
self.step_version = {
step.name: max(
self.configuration[param.name]["version"]
for param in step.parameters
)
for step in db_session.scalars(select(Step)).all()
}
if not dummy:
self._cleanup_interrupted(db_session)
[docs]
def get_config(
self,
matched_expressions,
db_session,
*,
db_step=None,
step_name=None,
image_id=None,
channel=None,
):
"""Return the configuration for the given step for given expressions."""
assert db_step or step_name
if matched_expressions is None:
assert image_id is not None and channel is not None
matched_expressions = self._evaluated_expressions[image_id][
channel
]["matched"]
with TemporaryDirectory() as temp_dir:
temp_file_path = path.join(temp_dir, "config_file.tmp")
with open(temp_file_path, mode="w") as config_file:
config_key = self._write_config_file(
matched_expressions,
config_file,
db_steps=[db_step] if db_step else None,
step_names=[step_name] if not db_step else None,
db_session=db_session,
)
self._logger.debug(
"Flushing config file %s", repr(config_file.name)
)
config_file.flush()
os.fsync(
config_file.fileno()
) # Ensure data is written to disk (cross-platform)
self._logger.debug(
"Wrote config file %s", repr(config_file.name)
)
return (
getattr(
processing_steps, db_step.name if db_step else step_name
).parse_command_line(["-c", config_file.name]),
config_key,
)
[docs]
@abstractmethod
def set_pending(self, db_session):
"""
Set the unprocessed images and channels split by step and image type.
Args:
db_session(Session): The database session to use.
Returns:
{(step.id, image_type.id): (Image, str)}:
The images and channels of the specified type for which the
specified step has not applied with the current configuration.
"""
[docs]
def add_masters(self, new_masters, step_name=None, image_type_name=None):
"""
Add new master files to the database.
Args:
new_masters(dict or iterable of dicts): Information about the new
mbaster(s) to add. Each dictionary should include:
* type: The type of master being added.
* filename: The full path to the new master file.
* preference_order: Expression to select among multiple possible
masters. For each frame the expression for each candidate
master is evaluateed using the frame header and the master
with the smallest resulting value is used.
* disable(bool): Optional. If set to True the masters are
recorded in the database, but not flagged enabled.
step_name(str): The name of the step that generated the
masters.
image_type_name(str): The name of the type of images whose
processing created the masters.
"""
self._logger.debug(
"Adding new masters from %s step for %s images:\n%s",
step_name,
image_type_name,
repr(new_masters),
)
# False positivie
# pylint: disable=no-member
with Session.begin() as db_session:
# pylint: enable=no-member
master_id = (
db_session.scalar(
# False positive
# pylint: disable=not-callable
select(sql.func.max(MasterFile.id))
# pylint: enable=not-callable
)
or 0
) + 1
type_id_select = select(MasterType.id)
if step_name is not None:
assert image_type_name is not None
type_id_select = (
type_id_select.join(ImageType)
.join(Step)
.where(
Step.name == step_name,
ImageType.name == image_type_name,
)
)
if isinstance(new_masters, dict):
new_masters = (new_masters,)
if self._current_processing is not None:
self._current_processing = db_session.merge(
self._current_processing, load=False
)
for master in new_masters:
if len(new_masters) > 1 or step_name is None:
master_type_id = db_session.scalar(
type_id_select.where(MasterType.name == master["type"])
)
else:
master_type_id = db_session.scalar(type_id_select)
db_session.add(
MasterFile(
id=master_id,
type_id=master_type_id,
progress_id=(
None
if self._current_processing is None
else self._current_processing.id
),
filename=master["filename"],
use_smallest=master["preference_order"],
enabled=not master.get("disable", False),
)
)
master_id += 1
[docs]
def create_config_file(self, example_header, outf, steps=None):
"""
Save configuration for processing given header to given output file.
Args:
example_header(str or dict-like): The header to use
to determine the values of the configuration parameters. Can be
passed directly as a header instance or FITS or DR filename.
outf(file or str): The file to write the configuration to. Can be
passed as something providing a write method or filename.
Overwritten if exists.
steps(list): If specified, only configuration parameters required
by these steps will be included.
steps=None
Returns:
None
"""
matched_expressions = self.get_matched_expressions(
Evaluator(example_header)
)
# False positivie
# pylint: disable=no-member
with Session.begin() as db_session:
# pylint: enable=no-member
if isinstance(outf, str):
with open(outf, "w", encoding="utf-8") as opened_outf:
self._write_config_file(
matched_expressions,
opened_outf,
step_names=steps,
db_session=db_session,
)
else:
self._write_config_file(
matched_expressions,
outf,
step_names=steps,
db_session=db_session,
)
[docs]
@abstractmethod
def __call__(self, limit_to_steps=None):
"""Perform all the processing for the given steps (all if None)."""
# pylint: enable=too-many-locals
# pylint: enable=too-many-branches
# pylint: enable=too-many-instance-attributes