Source code for romancal.resample.resample_step

import logging
import os
from copy import deepcopy

import asdf
import numpy as np
from roman_datamodels import datamodels
from stcal.alignment import util
from stpipe.extern.configobj.configobj import ConfigObj
from stpipe.extern.configobj.validate import Validator

from ..datamodels import ModelContainer
from ..stpipe import RomanStep
from . import resample

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

__all__ = ["ResampleStep"]


[docs] class ResampleStep(RomanStep): """ Resample input data onto a regular grid using the drizzle algorithm. .. note:: When supplied via ``output_wcs``, a custom WCS overrides other custom WCS parameters such as ``output_shape`` (now computed from by ``output_wcs.bounding_box``), ``crpix`` Parameters ----------- input : str, `roman_datamodels.datamodels.DataModel`, or `~romancal.datamodels.container.ModelContainer` If a string is provided, it should correspond to either a single ASDF filename or an association filename. Alternatively, a single DataModel instance can be provided instead of an ASDF filename. Multiple files can be processed via either an association file or wrapped by a `~romancal.datamodels.container.ModelContainer`. Returns ------- : `roman_datamodels.datamodels.MosaicModel` A mosaic datamodel with the final output frame. """ # noqa: E501 class_alias = "resample" spec = """ pixfrac = float(default=1.0) # change back to None when drizpar reference files are updated kernel = string(default='square') # change back to None when drizpar reference files are updated fillval = string(default='INDEF' ) # change back to None when drizpar reference files are updated weight_type = option('ivm', 'exptime', None, default='ivm') # change back to None when drizpar ref update output_shape = int_list(min=2, max=2, default=None) # [x, y] order crpix = float_list(min=2, max=2, default=None) crval = float_list(min=2, max=2, default=None) rotation = float(default=None) pixel_scale_ratio = float(default=1.0) # Ratio of input to output pixel scale pixel_scale = float(default=None) # Absolute pixel scale in arcsec output_wcs = string(default='') # Custom output WCS. single = boolean(default=False) blendheaders = boolean(default=True) allowed_memory = float(default=None) # Fraction of memory to use for the combined image. in_memory = boolean(default=True) good_bits = string(default='~DO_NOT_USE+NON_SCIENCE') # The good bits to use for building the resampling mask. """ # noqa: E501 reference_file_types = []
[docs] def process(self, input): if isinstance(input, datamodels.DataModel): input_models = ModelContainer([input]) # set output filename from meta.filename found in the first datamodel output = input_models[0].meta.filename self.blendheaders = False elif isinstance(input, str): # either a single asdf filename or an association filename try: # association filename input_models = ModelContainer(input) except Exception: # single ASDF filename input_models = ModelContainer([input]) if hasattr(input_models, "asn_table") and len(input_models.asn_table): # set output filename from ASN table output = input_models.asn_table["products"][0]["name"] elif hasattr(input_models[0], "meta"): # set output filename from meta.filename found in the first datamodel output = input_models[0].meta.filename elif isinstance(input, ModelContainer): input_models = input # set output filename using the common prefix of all datamodels output = ( f"{os.path.commonprefix([x.meta.filename for x in input_models])}.asdf" ) if len(output) == 0: # set default filename if no common prefix can be determined output = "resample_output.asdf" else: raise TypeError( "Input must be an ASN filename, a ModelContainer, " "a single ASDF filename, or a single Roman DataModel." ) # Check that input models are 2D images if len(input_models[0].data.shape) != 2: # resample can only handle 2D images, not 3D cubes, etc raise RuntimeError(f"Input {input_models[0]} is not a 2D image.") self.wht_type = self.weight_type self.log.info("Setting drizzle's default parameters...") kwargs = self.set_drizzle_defaults() kwargs["allowed_memory"] = self.allowed_memory # Issue a warning about the use of exptime weighting if self.wht_type == "exptime": self.log.warning("Use of EXPTIME weighting will result in incorrect") self.log.warning("propagated errors in the resampled product") # Custom output WCS parameters. # Modify get_drizpars if any of these get into reference files: kwargs["output_shape"] = self._check_list_pars( self.output_shape, "output_shape", min_vals=[1, 1] ) kwargs["output_wcs"] = self._load_custom_wcs( self.output_wcs, kwargs["output_shape"] ) kwargs["crpix"] = self._check_list_pars(self.crpix, "crpix") kwargs["crval"] = self._check_list_pars(self.crval, "crval") kwargs["rotation"] = self.rotation kwargs["pscale"] = self.pixel_scale kwargs["pscale_ratio"] = self.pixel_scale_ratio kwargs["in_memory"] = self.in_memory # Call the resampling routine resamp = resample.ResampleData(input_models, output=output, **kwargs) result = resamp.do_drizzle() for model in result: self._final_updates(model, input_models, kwargs) if len(result) == 1: result = result[0] input_models.close() return result
def _final_updates(self, model, input_models, kwargs): model.meta.cal_step["resample"] = "COMPLETE" util.update_s_region_imaging(model) if ( input_models.asn_pool_name is not None and input_models.asn_table_name is not None ): # update ASN attributes model.meta.asn.pool_name = input_models.asn_pool_name model.meta.asn.table_name = input_models.asn_table_name # if pixel_scale exists, it will override pixel_scale_ratio. # calculate the actual value of pixel_scale_ratio based on pixel_scale # because source_catalog uses this value from the header. model.meta.resample.pixel_scale_ratio = ( self.pixel_scale / np.sqrt(model.meta.photometry.pixelarea_arcsecsq) if self.pixel_scale else self.pixel_scale_ratio ) model.meta.resample.pixfrac = kwargs["pixfrac"] self.update_phot_keywords(model) model.meta.resample["good_bits"] = kwargs["good_bits"] @staticmethod def _check_list_pars(vals, name, min_vals=None): """ Check if a specific keyword parameter is properly formatted. Parameters ---------- vals : list or tuple A list or tuple containing a pair of values currently assigned to the keyword parameter `name`. Both values must be either `None` or not `None`. name : str The name of the keyword parameter. min_vals : list or tuple, optional A list or tuple containing a pair of minimum values to be assigned to `name`, by default None. Returns ------- None or list If either `vals` is set to `None` (or both of its elements), the returned result will be `None`. Otherwise, the returned result will be a list containing the current values assigned to `name`. Raises ------ ValueError This error will be raised if any of the following conditions are met: - the number of elements of `vals` is not 2; - the currently assigned values of `vals` are smaller than the minimum value provided; - one element is `None` and the other is not `None`. """ if vals is None: return None if len(vals) != 2: raise ValueError(f"List '{name}' must have exactly two elements.") n = sum(x is None for x in vals) if n == 2: return None elif n == 0: if min_vals and sum(x >= y for x, y in zip(vals, min_vals)) != 2: raise ValueError( f"'{name}' values must be larger or equal to {list(min_vals)}" ) return list(vals) else: raise ValueError(f"Both '{name}' values must be either None or not None.") @staticmethod def _load_custom_wcs(asdf_wcs_file, output_shape): if not asdf_wcs_file: return None with asdf.open(asdf_wcs_file) as af: wcs = deepcopy(af.tree["wcs"]) if output_shape is not None: wcs.array_shape = output_shape[::-1] elif wcs.pixel_shape is not None: wcs.array_shape = wcs.pixel_shape[::-1] elif wcs.bounding_box is not None: wcs.array_shape = tuple( int(axs[1] - axs[0] + 0.5) for axs in wcs.bounding_box.bounding_box(order="C") ) elif wcs.array_shape is None: raise ValueError( "Step argument 'output_shape' is required when custom WCS " "does not have neither of 'array_shape', 'pixel_shape', or " "'bounding_box' attributes set." ) return wcs
[docs] def update_phot_keywords(self, model): """Update pixel scale keywords""" if model.meta.photometry.pixelarea_steradians is not None: model.meta.photometry.pixelarea_steradians *= ( model.meta.resample.pixel_scale_ratio**2 ) if model.meta.photometry.pixelarea_arcsecsq is not None: model.meta.photometry.pixelarea_arcsecsq *= ( model.meta.resample.pixel_scale_ratio**2 )
[docs] def set_drizzle_defaults(self): """Set the default parameters for drizzle.""" configspec = self.load_spec_file() config = ConfigObj(configspec=configspec) if config.validate(Validator()): kwargs = config.dict() if self.pixfrac is None: self.pixfrac = 1.0 if self.kernel is None: self.kernel = "square" if self.fillval is None: self.fillval = "INDEF" # Force definition of good bits kwargs["good_bits"] = self.good_bits kwargs["pixfrac"] = self.pixfrac kwargs["kernel"] = str(self.kernel) kwargs["fillval"] = str(self.fillval) # self.weight_type has a default value of None # The other instruments read this parameter from a reference file if self.wht_type is None: self.wht_type = "ivm" kwargs["wht_type"] = str(self.wht_type) kwargs["pscale_ratio"] = self.pixel_scale_ratio kwargs.pop("pixel_scale_ratio") for k, v in kwargs.items(): if k in [ "pixfrac", "kernel", "fillval", "wht_type", "pscale_ratio", ]: log.info(" using: %s=%s", k, repr(v)) return kwargs