Source code for romancal.stpipe.core

"""
Roman Calibration Pipeline base class
"""

import importlib.metadata
import logging
import time

import roman_datamodels as rdm
from roman_datamodels.datamodels import ImageModel, MosaicModel
from stpipe import Pipeline, Step, crds_client

from ..lib.suffix import remove_suffix

_LOG_FORMATTER = logging.Formatter(
    "%(asctime)s.%(msecs)03dZ :: %(name)s :: %(levelname)s :: %(message)s",
    datefmt="%Y-%m-%dT%H:%M:%S",
)
_LOG_FORMATTER.converter = time.gmtime


log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)


[docs] class RomanStep(Step): """ Base class for Roman calibration pipeline steps. """ spec = """ output_ext = string(default='.asdf') # Default type of output """ @classmethod def _datamodels_open(cls, init, **kwargs): """ Provide access to this package's datamodels.open function so that the stpipe infrastructure knows how to instantiate models. """ return rdm.open(init, **kwargs)
[docs] def finalize_result(self, model, reference_files_used): """ Hook that allows the Step to set metadata on the output model before save. Parameters ---------- model : roman_datamodels.datamodels.DataModel Output model. reference_files_used : list of tuple(str, str) List of reference files used. The first element of each tuple is the reftype code, the second element is the filename. """ model.meta.calibration_software_version = importlib.metadata.version("romancal") if isinstance(model, (ImageModel, MosaicModel)): for log_record in self.log_records: model.cal_logs.append(_LOG_FORMATTER.format(log_record)) if len(reference_files_used) > 0: for ref_name, ref_file in reference_files_used: if hasattr(model.meta.ref_file, ref_name): setattr(model.meta.ref_file, ref_name, ref_file) # getattr(model.meta.ref_file, ref_name).name = ref_file model.meta.ref_file.crds.sw_version = crds_client.get_svn_version() model.meta.ref_file.crds.context_used = crds_client.get_context_used( model.crds_observatory ) # this will only run if 'parent' is none, which happens when an individual # step is being run or if self is a RomanPipeline and not a RomanStep. if self.parent is None: log.info( f"Results used CRDS context: {model.meta.ref_file.crds.context_used}" )
[docs] def record_step_status(self, model, step_name, success=True): """ Record step completion status in the model's metadata. Parameters ---------- model : roman_datamodels.datamodels.DataModel Output model. step_name : str Calibration step name. success : bool If True, then the step was run successfully. """ # JWST sets model.meta.cal_step.<step name> here. Roman # may do the same, depending on how the metadata format # turns out. Seems like we might be able to combine this # with finalize_result somehow. pass
[docs] def remove_suffix(self, name): """ Remove any Roman step-specific suffix from the given filename. Parameters ---------- name : str Filename. Returns ------- str Filename with step suffix removed. """ # JWST maintains a list of relevant suffixes that is monitored # by tests to be up-to-date. Roman will likely need to do # something similar. return remove_suffix(name)
# RomanPipeline needs to inherit from Pipeline, but also # be a subclass of RomanStep so that it will pass checks # when constructing a pipeline using RomanStep class methods.
[docs] class RomanPipeline(Pipeline, RomanStep): pass