Source code for autowisp.tests.h5_test_case

"""Define class to compare groups in DR files."""

from os import path
from glob import glob

import numpy
import h5py


from autowisp.tests import AutoWISPTestCase


[docs] class H5TestCase(AutoWISPTestCase): """Add assert for comparing groups in HDF5 files."""
[docs] def assert_groups_match(self, dr_fname1, dr_fname2, group_name, ignore): """Check if two DR files have the same groups.""" def assert_dset_match(dset1, dset2): """Assert that the two datasets contain the same data.""" if dset1.dtype.kind == "f": differ = numpy.logical_not( numpy.isclose( dset1[:], dset2[:], rtol=1e-8, atol=1e-8, equal_nan=True ) ) self.assertFalse( differ.any(), f"Data in datasets {dr_fname1!r}/{dset1.name!r} and " f"{dr_fname2!r}/{dset2.name!r} do not match (differint ." f"elements: {numpy.nonzero(differ)}", ) else: self.assertTrue( numpy.array_equal(dset1[:], dset2[:]), f"Data in datasets {dr_fname1!r}/{dset1.name!r} and " f"{dr_fname2!r}/{dset2.name!r} do not match.", ) with h5py.File(dr_fname1, "r") as dr1, h5py.File(dr_fname2, "r") as dr2: if group_name not in dr1: self.assertTrue( group_name not in dr2, f"Group {group_name!r} not found in {dr_fname1}.", ) return self.assertTrue( group_name in dr2, f"Group {group_name!r} not found in {dr_fname2}.", ) def assert_obj_match(_, obj1): """Assert the two datasets or groups contain the same data.""" if ignore is not None and ignore(obj1.name): return obj2 = dr2[obj1.name] self.assertEqual( set(obj1.attrs.keys()), set(obj2.attrs.keys()), f"Attributes in {dr_fname1!r}/{obj1.name!r} and " f"{dr_fname2!r}/{obj2.name!r} do not match.", ) for key, value in obj1.attrs.items(): msg = ( f"Attribute {dr_fname1!r}/{obj1.name!r}.{key} does not " f"match {dr_fname1!r}/{obj1.name!r}.{key}." ) if numpy.atleast_1d(value).dtype.kind == "f": self.assertTrue( numpy.allclose( obj2.attrs[key], value, rtol=1e-8, atol=1e-8, equal_nan=True, ), msg, ) elif numpy.atleast_1d(value).size > 1: self.assertTrue( numpy.array_equal(obj2.attrs[key], value), msg ) else: self.assertEqual(obj2.attrs[key], value, msg) if isinstance(obj1, h5py.Dataset): self.assertTrue( isinstance(obj2, h5py.Dataset), f"Object {dr_fname2!r}/{obj2.name!r} is not a dataset!", ) if not obj1.name.endswith("/MaxSources"): assert_dset_match(obj1, obj2) if isinstance(dr1[group_name], h5py.Dataset): assert_obj_match(None, dr1[group_name]) else: dr1[group_name].visititems(assert_obj_match)
# pylint: disable=too-many-arguments
[docs] def run_step_test( self, step_name, inputs, compare, *, ignore=None, output_type="DR" ): """ Run a test of a single step that updates the DR files. Args: step_name(str): The name of the step being tested inputs([]): List of the directories or files needed by the step. The first entry (with full path added) is passed as input to the step. compare([]): List of the HDF5 groups to compare in order to ensure the step produced correct results, ignore(callable): Function that returns true on any dataset or group in the HDF5 file that should not be compared when it is under the groups specified in ``compare`` tput_type(str): The type of output files produced by the step (i.e. whic files should be compared), """ if isinstance(inputs, str): inputs = [inputs] self.get_inputs(inputs) for fname in glob( path.join(self.processing_directory, output_type, "*.h5") ): with h5py.File(fname, "a") as h5_file: for group in compare: if group in h5_file: del h5_file[group] self.run_step( [ f"wisp-{step_name.replace('_', '-')}", "-c", "test.cfg", path.join(self.processing_directory, inputs[0]), ] ) generated = sorted( glob(path.join(self.processing_directory, output_type, "*.h5")) ) expected = sorted( glob(path.join(self.test_directory, output_type, "*.h5")) ) self.assertTrue( [path.basename(fname) for fname in generated] == [path.basename(fname) for fname in expected], "Generated files do not match expected files!", ) for gen_fname, exp_fname in zip(generated, expected): for group in compare: self.assert_groups_match(gen_fname, exp_fname, group, ignore) self.assert_groups_match(exp_fname, gen_fname, group, ignore) self.successful_test = True
# pylint: enable=too-many-arguments