"""
Utilities for fitting model PSFs to rate images.
"""
import logging
import os
import astropy.units as u
import numpy as np
import webbpsf
from astropy.modeling.fitting import LevMarLSQFitter
from astropy.nddata import CCDData, bitmask
from astropy.table import Table
from photutils.background import LocalBackground
from photutils.detection import DAOStarFinder
from photutils.psf import (
GriddedPSFModel,
IterativePSFPhotometry,
PSFPhotometry,
SourceGrouper,
)
from roman_datamodels.datamodels import ImageModel
from roman_datamodels.dqflags import pixel
from webbpsf import conf, gridded_library, restart_logging
__all__ = [
"create_gridded_psf_model",
"fit_psf_to_image_model",
"dq_to_boolean_mask",
]
# set loggers to debug level by default:
log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)
# Phase C central wavelengths [micron], released by Goddard (Jan 2023):
# https://roman.ipac.caltech.edu/sims/Param_db.html#wfi_filters
filter_central_wavelengths = {
"F062": 0.620,
"F087": 0.869,
"F106": 1.060,
"F129": 1.293,
"F146": 1.464,
"F158": 1.577,
"F184": 1.842,
"F213": 2.125,
}
default_finder = DAOStarFinder(
# these defaults extracted from the
# romancal SourceDetectionStep
fwhm=1.0,
threshold=0.0,
sharplo=0.0,
sharphi=1.0,
roundlo=-1.0,
roundhi=1.0,
peakmax=None,
)
[docs]
def create_gridded_psf_model(
path_prefix,
filt,
detector,
oversample=11,
fov_pixels=9,
sqrt_n_psfs=2,
overwrite=False,
buffer_pixels=100,
instrument_options=None,
logging_level=None,
):
"""
Compute a gridded PSF model for one SCA via
`~webbpsf.gridded_library.CreatePSFLibrary`.
Parameters
----------
path_prefix : str or Path-like
Prefix to the output file path for the gridded PSF model
FITS file. A suffix denoting the detector name will
be appended.
filt : str
Filter name, starting with "F". For example: `"F184"`.
detector : str
Computed gridded PSF model for this SCA.
Examples include: `"SCA01"` or `"SCA18"`.
oversample : int, optional
Oversample factor, default is 11. See WebbPSF docs for details [1]_.
Choosing an odd number makes the pixel convolution more accurate.
fov_pixels : int, optional
Field of view width [pixels]. Default is 12.
See WebbPSF docs for details [1]_.
sqrt_n_psfs : int, optional
Square root of the number of PSFs to calculate, distributed uniformly
across the detector. Default is 4.
overwrite : bool, optional
Overwrite output file if one already exists. Default is False.
buffer_pixels : int, optional
Calculate a grid of PSFs distributed uniformly across the detector
at least ``buffer_pixels`` away from the detector edges. Default is 100.
instrument_options : dict, optional
Instrument configuration options passed to WebbPSF.
For example, WebbPSF assumes Roman pointing jitter consistent with
mission specs by default, but this can be turned off with:
``{'jitter': None, 'jitter_sigma': 0}``.
logging_level : str, optional
Set logging level by name if not `None`, otherwise inherit from
the romancal logger.
Returns
-------
gridmodel : `photutils.psf.GriddedPSFModel`
Gridded PSF model evaluated at several locations on one SCA.
model_psf_centroids : list of tuples
Pixel locations of the PSF models calculated for ``gridmodel``.
References
----------
.. [1] `WebbPSF documentation for `webbpsf.JWInstrument.calc_psf`
<https://webbpsf.readthedocs.io/en/latest/api/webbpsf.JWInstrument.html#webbpsf.JWInstrument.calc_psf>`_
"""
if int(sqrt_n_psfs) != sqrt_n_psfs:
raise ValueError(f"`sqrt_n_psfs` must be an integer, got {sqrt_n_psfs}.")
n_psfs = int(sqrt_n_psfs) ** 2
# webbpsf appends "_sca??.fits" to the requested path:
expected_output_path = f"{path_prefix}_{detector.lower()}.fits"
# Choose pixel boundaries for the grid of PSFs:
start_pix = 0
stop_pix = 4096
# Choose locations on detector for each PSF:
if sqrt_n_psfs != 1:
pixel_range = np.linspace(
start_pix + buffer_pixels, stop_pix - buffer_pixels, int(sqrt_n_psfs)
)
else:
pixel_range = [(start_pix + stop_pix) / 2]
# generate PSFs over a grid of detector positions [pix]
model_psf_centroids = [(int(x), int(y)) for y in pixel_range for x in pixel_range]
if not os.path.exists(expected_output_path) or overwrite:
if logging_level is None:
# pass along logging level from __name__'s logger to WebbPSF:
logging_level = logging.getLevelName(log.level)
# set the WebbPSF logging level (similar to webbpsf.utils.setup_logging):
conf.logging_level = logging_level
restart_logging(verbose=False)
wfi = webbpsf.roman.WFI()
wfi.filter = filt
if instrument_options is not None:
wfi.options.update(instrument_options)
# Initialize the PSF library
inst = gridded_library.CreatePSFLibrary(
instrument=wfi,
filter_name=filt,
detectors=detector.upper(),
num_psfs=n_psfs,
oversample=oversample,
fov_pixels=fov_pixels,
add_distortion=False,
crop_psf=False,
save=True,
filename=path_prefix,
overwrite=overwrite,
verbose=False,
)
inst.location_list = model_psf_centroids
# Create the PSF grid:
gridmodel = inst.create_grid()
elif os.path.exists(expected_output_path):
logging.log(
logging.INFO,
f"Loading existing gridded PSF model from {expected_output_path}",
)
psf_model = CCDData.read(expected_output_path, unit=u.electron / u.s, ext=0)
psf_model.meta = dict(psf_model.meta)
psf_model.meta["oversampling"] = oversample
psf_model.meta["grid_xypos"] = np.array(
[list(tup)[::-1] for tup in model_psf_centroids]
)
gridmodel = GriddedPSFModel(psf_model)
return gridmodel, model_psf_centroids
[docs]
def fit_psf_to_image_model(
image_model=None,
data=None,
error=None,
dq=None,
photometry_cls=PSFPhotometry,
psf_model=None,
grouper=None,
fitter=None,
localbkg_estimator=None,
finder=None,
x_init=None,
y_init=None,
progress_bar=False,
error_lower_limit=None,
fit_shape=(15, 15),
exclude_out_of_bounds=True,
):
"""
Fit PSF models to an ``ImageModel``.
Parameters
----------
image_model : `roman_datamodels.datamodels.ImageModel`
Image datamodel. If ``image_model`` is supplied,
``data,error,dq`` should be `None`.
data : `astropy.units.Quantity`
Fit a PSF model to the rate image ``data``.
If ``data,error,dq`` are supplied, ``image_model`` should be `None`.
error : `astropy.units.Quantity`
Uncertainties on fluxes in ``data``. Should be `None` if
``image_model`` is supplied.
dq : `numpy.ndarray`
Data quality bitmask for ``data``. Should be `None` if
``image_model`` is supplied.
photometry_cls : {`photutils.psf.PSFPhotometry`,
`photutils.psf.IterativePSFPhotometry`}
Choose a photutils PSF photometry technique (default or iterative).
psf_model : `astropy.modeling.Fittable2DModel`
The 2D PSF model to fit to the rate image. Usually this model is an instance
of `photutils.psf.GriddedPSFModel`.
grouper : `photutils.psf.SourceGrouper`
Specifies rules for attempting joint fits of multiple PSFs when
there are nearby sources at small separations.
fitter : `astropy.modeling.fitting.Fitter`, optional
Modeling class which optimizes the PSF fit.
Default is `astropy.modeling.fitting.LevMarLSQFitter(calc_uncertainties=True)`.
localbkg_estimator : `photutils.background.LocalBackground`, optional
Specifies inner and outer radii for computing flux background near
a source. Default has ``inner_radius=10, outer_radius=30``.
finder : subclass of `photutils.detection.StarFinderBase`, optional
When ``photutils_cls`` is `photutils.psf.IterativePSFPhotometry`, the
``finder`` is called to determine if sources remain in the rate image
after one PSF model is fit to the observations and removed.
Default was extracted from the `DAOStarFinder` call in the
Source Detection step.
x_init : `numpy.ndarray`, optional
Initial guesses for the ``x`` pixel coordinates of each source to fit.
y_init : `numpy.ndarray`, optional
Initial guesses for the ``y`` pixel coordinates of each source to fit.
progress_bar : bool, optional
Render a progress bar via photutils. Default is False.
error_lower_limit : `astropy.units.Quantity`, optional
Since some synthetic images may have bright sources with very
small statistical uncertainties, the ``error`` can be clipped at
``error_lower_limit`` to prevent over-confident fits.
fit_shape : int, or tuple of length 2, optional
Rectangular shape around the center of a star that will
be used to define the PSF-fitting data. See docs for
`photutils.psf.PSFPhotometry` for details. Default is ``(16, 16)``.
exclude_out_of_bounds : bool, optional
If `True`, do not attempt to fit stars which have initial centroids
that fall outside the pixel limits of the SCA. Default is False.
Returns
-------
results_table : `astropy.table.QTable`
PSF photometry results.
photometry : instance of class ``photutils_cls``
PSF photometry instance with configuration settings and results.
"""
if grouper is None:
# minimum separation before sources are fit simultaneously:
grouper = SourceGrouper(min_separation=5) # [pix]
if fitter is None:
fitter = LevMarLSQFitter(calc_uncertainties=True)
# the iterative PSF method requires a finder:
psf_photometry_kwargs = {}
if photometry_cls is IterativePSFPhotometry or (x_init is None and y_init is None):
if finder is None:
finder = default_finder
psf_photometry_kwargs["finder"] = finder
if localbkg_estimator is None:
localbkg_estimator = LocalBackground(
inner_radius=10, # [pix]
outer_radius=30, # [pix]
)
photometry = photometry_cls(
grouper=grouper,
localbkg_estimator=localbkg_estimator,
psf_model=psf_model,
fitter=fitter,
fit_shape=fit_shape,
aperture_radius=fit_shape[0],
progress_bar=progress_bar,
**psf_photometry_kwargs,
)
if x_init is not None and y_init is not None:
guesses = Table(np.column_stack([x_init, y_init]), names=["x_init", "y_init"])
else:
guesses = None
if image_model is None:
if data is None and error is None:
raise ValueError(
"PSF fitting requires either an ImageModel, "
"or arrays for the data and error."
)
ignore_flags = pixel.NO_LIN_CORR
# presently the linearity correction is somewhat problematic in
# CRDS reference files; we should replace this with ignore_flags = 0
# at some point in the future.
if dq is None:
if image_model is not None:
mask = dq_to_boolean_mask(image_model, ignore_flags=ignore_flags)
else:
mask = None
else:
mask = dq_to_boolean_mask(dq)
if data is None and image_model is not None:
data = image_model.data
if error is None and image_model is not None:
error = image_model.err
if error_lower_limit is not None:
# option to enforce a lower limit on the flux uncertainties
error = np.clip(error, error_lower_limit, None)
# we also mask non-finite values in the data and error arrays:
non_finite = ~np.isfinite(data) | ~np.isfinite(error)
if exclude_out_of_bounds and guesses is not None:
# don't attempt to fit PSFs for objects with initial centroids
# outside the detector boundaries:
init_centroid_in_range = (
(guesses["x_init"] > 0)
& (guesses["x_init"] < data.shape[0])
& (guesses["y_init"] > 0)
& (guesses["y_init"] < data.shape[1])
)
guesses = guesses[init_centroid_in_range]
# fit the model PSF to the data:
results_table = photometry(
data=data, error=error, init_params=guesses, mask=mask | non_finite
)
# results are stored on the PSFPhotometry instance:
return results_table, photometry
[docs]
def dq_to_boolean_mask(image_model_or_dq, ignore_flags=0, flag_map_name="ROMAN_DQ"):
"""
Convert a DQ bitmask to a boolean mask. Useful for photutils methods.
Parameters
----------
image_model_or_dq : `roman_datamodels.datamodels.ImageModel` or `numpy.ndarray`
ImageModel containing the DQ bitmask to convert to a boolean mask,
or the DQ bitmask itself.
ignore_flags : int, str, list, None (default = 0)
See docs for `astropy.nddata.bitmask.extend_bit_flag_map`
flag_map_name : str
Name for the bitmask flag map in the astropy bitmask registry
Returns
-------
mask : `numpy.ndarray`
Boolean mask
"""
if isinstance(image_model_or_dq, ImageModel):
dq = image_model_or_dq.dq
else:
dq = image_model_or_dq
# add the Roman DQ flags to the astropy bitmask registry:
dq_flag_map = {dq.name: dq.value for dq in pixel if dq.name != "GOOD"}
bitmask.extend_bit_flag_map(flag_map_name, **dq_flag_map)
# convert the bitmask to a boolean mask:
mask = bitmask.bitfield_to_boolean_mask(dq, ignore_flags=ignore_flags)
return mask.astype(bool)