"""Collection of utilities for working with files."""
import os
import os.path
from contextlib import contextmanager
from tempfile import NamedTemporaryFile
from subprocess import call
import logging
from astropy.io import fits
_logger = logging.getLogger(__name__)
[docs]
def prepare_file_output(fname,
allow_existing,
allow_dir_creation,
delete_existing=False):
"""Ger ready to create/overwrite a file with the given name."""
result = False
if os.path.exists(fname):
if not allow_existing:
raise OSError(
f'Destination file {fname!r} already exists and overwritting '
'not allowed!'
)
if delete_existing:
_logger.info('Overwriting %s', fname)
os.remove(fname)
else:
result = True
out_path = os.path.dirname(fname)
if allow_dir_creation and out_path and not os.path.exists(out_path):
_logger.info('Creating output directory: %s',
repr(out_path))
os.makedirs(out_path)
return result
[docs]
@contextmanager
def get_unpacked_fits(fits_fname):
"""Ensure the result is an unpacked version of the frame."""
with fits.open(fits_fname, 'readonly') as fits_file:
#False positive
#pylint: disable=no-member
packed = fits_file[0].header['NAXIS'] == 0
#pylint: enable=no-member
if packed:
with NamedTemporaryFile(buffering=0, dir='/dev/shm') as unpacked_frame:
assert call(
['funpack', '-C', '-S', fits_fname],
stdout=unpacked_frame
) == 0
yield unpacked_frame.name
else:
yield fits_fname
[docs]
def get_fits_fname_root(fits_fname):
"""Return the FITS filename withou directories or extension."""
result = os.path.basename(fits_fname)
while True:
result, extension = os.path.splitext(result)
if not extension:
return result
[docs]
def get_fname_pattern_substitutions(fits_fname, fits_header=None):
"""Return a dictionary that can be used to complete a filename pattern."""
if fits_header is None:
with fits.open(fits_fname, 'readonly') as fits_file:
fits_header = fits_file[
#False positive
#pylint: disable=no-member
0 if fits_file[0].header['NAXIS'] else 1
#pylint: enable=no-member
].header
return dict(
fits_header,
FITS_ROOT=get_fits_fname_root(fits_fname),
FITS_DIR=os.path.dirname(fits_fname)
)