"""Public common step definition for OutlierDetection processing."""
from functools import partial
from pathlib import Path
from romancal.datamodels import ModelContainer
from romancal.outlier_detection import outlier_detection
from ..stpipe import RomanStep
__all__ = ["OutlierDetectionStep"]
[docs]
class OutlierDetectionStep(RomanStep):
"""Flag outlier bad pixels and cosmic rays in DQ array of each input image.
Input images can be listed in an input association file or already wrapped
with a ModelContainer. DQ arrays are modified in place.
Parameters
-----------
input_data : `~romancal.datamodels.container.ModelContainer`
A `~romancal.datamodels.container.ModelContainer` object.
"""
class_alias = "outlier_detection"
# The members of spec needs to be a super-set of all parameters needed
# by the various versions of the outlier_detection algorithms, and each
# version will pick and choose what they need while ignoring the rest.
spec = """
weight_type = option('ivm','exptime',default='ivm') # Weighting type to use to create the median image
pixfrac = float(default=1.0) # Fraction by which input pixels are shrunk before being drizzled onto the output image grid
kernel = string(default='square') # Shape of the kernel used for flux distribution onto output images
fillval = string(default='INDEF') # Value assigned to output pixels that have zero weight or no flux during drizzling
nlow = integer(default=0) # The number of low values in each pixel stack to ignore when computing the median value
nhigh = integer(default=0) # The number of high values in each pixel stack to ignore when computing the median value
maskpt = float(default=0.7) # Percentage of weight image values below which they are flagged as bad pixels
grow = integer(default=1) # The distance beyond the rejection limit for additional pixels to be rejected in an image
snr = string(default='5.0 4.0') # The signal-to-noise values to use for bad pixel identification
scale = string(default='1.2 0.7') # The scaling factor applied to derivative used to identify bad pixels
backg = float(default=0.0) # User-specified background value to subtract during final identification step
kernel_size = string(default='7 7') # Size of kernel to be used during resampling of the data
save_intermediate_results = boolean(default=False) # Specifies whether or not to write out intermediate products to disk
resample_data = boolean(default=True) # Specifies whether or not to resample the input images when performing outlier detection
good_bits = string(default="~DO_NOT_USE+NON_SCIENCE") # DQ bit value to be considered 'good'
allowed_memory = float(default=None) # Fraction of memory to use for the combined image
in_memory = boolean(default=False) # Specifies whether or not to keep all intermediate products and datamodels in memory
""" # noqa: E501
[docs]
def process(self, input_models):
"""Perform outlier detection processing on input data."""
self.skip = False
try:
self.input_models = ModelContainer(input_models)
except TypeError:
self.log.warning(
"Skipping outlier_detection - input cannot be parsed into a ModelContainer."
)
self.skip = True
return input_models
# validation
if len(self.input_models) >= 2 and all(
model.meta.exposure.type == "WFI_IMAGE" for model in self.input_models
):
# Setup output path naming if associations are involved.
asn_id = self.input_models.asn_table.get("asn_id", None)
if asn_id is not None:
_make_output_path = self.search_attr(
"_make_output_path", parent_first=True
)
self._make_output_path = partial(_make_output_path, asn_id=asn_id)
detection_step = outlier_detection.OutlierDetection
pars = {
"weight_type": self.weight_type,
"pixfrac": self.pixfrac,
"kernel": self.kernel,
"fillval": self.fillval,
"nlow": self.nlow,
"nhigh": self.nhigh,
"maskpt": self.maskpt,
"grow": self.grow,
"snr": self.snr,
"scale": self.scale,
"backg": self.backg,
"kernel_size": self.kernel_size,
"save_intermediate_results": self.save_intermediate_results,
"resample_data": self.resample_data,
"good_bits": self.good_bits,
"allowed_memory": self.allowed_memory,
"in_memory": self.in_memory,
"make_output_path": self.make_output_path,
"resample_suffix": "i2d",
}
self.log.debug(
f"Using {detection_step.__name__} class for outlier_detection"
)
# Set up outlier detection, then do detection
step = detection_step(self.input_models, **pars)
step.do_detection()
state = "COMPLETE"
if not self.save_intermediate_results:
self.log.debug(
"The following files will be deleted since \
save_intermediate_results=False:"
)
for model in self.input_models:
model.meta.cal_step["outlier_detection"] = state
if not self.save_intermediate_results:
# remove intermediate files found in
# make_output_path() and the local dir
intermediate_files_paths = [
Path(self.make_output_path()).parent,
Path().cwd(),
]
intermediate_files_suffixes = (
"*blot.asdf",
"*median.asdf",
f'*{pars.get("resample_suffix")}*.asdf',
)
for current_path in intermediate_files_paths:
for suffix in intermediate_files_suffixes:
for filename in current_path.glob(suffix):
filename.unlink()
self.log.debug(f" {filename}")
else:
# if input can be parsed into a ModelContainer
# but is not valid then log a warning message and
# skip outlier detection step
self.log.warning(
"Skipping outlier_detection - at least two imaging observations are needed."
)
# set meta.cal_step.outlier_detection to SKIPPED
for model in self.input_models:
model.meta.cal_step["outlier_detection"] = "SKIPPED"
self.skip = True
return self.input_models