Source code for romancal.source_detection.source_detection_step

"""
Create a source catalog for tweakreg
"""

import logging

import astropy.units as u
import numpy as np
from asdf import AsdfFile
from astropy.stats import SigmaClip
from astropy.table import Table
from photutils.background import (
    Background2D,
    MeanBackground,
    MedianBackground,
    ModeEstimatorBackground,
)
from photutils.detection import DAOStarFinder
from roman_datamodels import datamodels as rdm
from roman_datamodels import maker_utils
from roman_datamodels.dqflags import pixel

from romancal.lib import psf
from romancal.stpipe import RomanStep

log = logging.getLogger(__name__)
log.setLevel(logging.DEBUG)

__all__ = ["SourceDetectionStep"]


[docs] class SourceDetectionStep(RomanStep): """ SourceDetectionStep: Detect point-like sources in image to create a catalog for alignment in tweakreg. """ spec = """ kernel_fwhm = float(default=None) # DAOStarFinder:Size of Gaussian kernel, # in pixels. sharplo = float(default=0.) # DAOStarFinder: Lower bound for sharpness. # Typical values of sharpness range from 0 (flat) to 1 (delta function). sharphi = float(default=1.0) # DAOStarFinder: Upper bound for sharpness. roundlo = float(default=-1.0) # DAOStarFinder: Lower bound for roundness. # A circular source will have a zero roundness. A source extended in x or # y will have a negative or positive roundness, respectively. roundhi = float(default=1.0) # DAOStarFinder: Upper bound for roundness. peakmax = float(default=100000.0) # Upper limit on brightest pixel in sources. max_sources = float(default=None) # Max number of sources, choosing brightest. scalar_threshold = float(default=None) # Detection threshold, to # be used for entire image. Assumed to be in same units as data, and is # an absolute threshold over background. calc_threshold = boolean(default=True) # Calculate a single absolute # detection threshold from image based on background. snr_threshold = float(default=3.0) # if calc_threshold_img, # the SNR for the threshold image. bkg_estimator = string(default='median') # if calc_threshold_img, # choice of mean, median, or mode. bkg_boxsize = integer(default=9) # if calc_threshold_img, # size of box in pixels for 2D background. bkg_sigma = float(default=2.0) # if calc_threshold_img, # n sigma for sigma clipping bkgrnd. bkg_filter_size = integer(default=3) # if calc_threshold_img, # size of Gaussian kernel for background. save_catalogs = boolean(default=False) # Save source catalog to file? # Will overwrite an existing catalog of the same name. output_cat_filetype = option('asdf', 'ecsv', default='asdf') # Used if #save_catalogs=True - file type of output catalog. fit_psf = boolean(default=True) # fit source PSFs for accurate astrometry min_separation = integer(default=11) # don't find multiple sources closer # than this number of pixels """
[docs] def process(self, input): with rdm.open(input, lazy_load=False) as input_model: if input_model.meta.exposure.type != "WFI_IMAGE": # Check to see if attempt to find sources in non-Image data log.info("Skipping source detection for spectral exposure.") input_model.meta.cal_step.source_detection = "SKIPPED" return input_model # remove units from data in this step. # DAOStarFinder requires unitless input if hasattr(input_model.data, "unit"): self.data = input_model.data.value else: self.data = input_model.data # mask DO_NOT_USE pixels self.coverage_mask = ((pixel.DO_NOT_USE) & input_model.dq).astype(bool) filt = input_model.meta.instrument["optical_element"] # if a pre-determined threshold value for detection for the whole # image is provided, use this if self.scalar_threshold is not None: threshold = float(self.scalar_threshold) log.info(f"Using a detection threshold of {threshold}.") # otherwise, if specified, calculate a scalar threshold from the # image by calculating a 2D background image, using this to create # a 2d threshold image, and using the median of the 2d threshold # image as the scalar detection threshold for the whole image elif self.calc_threshold: log.info("Determining detection threshold from image.") bkg = self._calc_2D_background() threshold_img = bkg.background + self.snr_threshold * bkg.background_rms threshold = np.median(threshold_img) log.info(f"Calculated a detection threshold of {threshold} from image.") else: threshold = np.median(self.data) if self.kernel_fwhm is None: # estimate the FWHM of the PSF using rough estimate # from the diffraction limit: central_wavelength = psf.filter_central_wavelengths[filt] * u.um diffraction_limit = float(1.22 * central_wavelength / (2.4 * u.m)) assume_pixel_scale = 0.11 * u.arcsec / u.pix expect_psf_pix = np.sqrt( ((diffraction_limit * u.rad).to(u.arcsec) / assume_pixel_scale) ** 2 + 1 * u.pix**2 ).to_value(u.pix) self.kernel_fwhm = expect_psf_pix log.info("Detecting sources with DAOStarFinder, using entire image array.") daofind = DAOStarFinder( fwhm=self.kernel_fwhm, threshold=threshold, sharplo=self.sharplo, sharphi=self.sharphi, roundlo=self.roundlo, roundhi=self.roundhi, brightest=self.max_sources, peakmax=self.peakmax, min_separation=self.min_separation, ) if self.scalar_threshold is not None: # if absolute threshold is provided sources = daofind(self.data, mask=self.coverage_mask) elif self.calc_threshold is not None: # subtract background from data if calculating abs. threshold sources = daofind(self.data - bkg.background, mask=self.coverage_mask) else: sources = daofind(self.data, mask=self.coverage_mask) # reduce table to minimal number of columns, just source ID, # positions, and fluxes columns = ["id", "xcentroid", "ycentroid", "flux"] if sources: catalog = sources[columns] log.info(f"Found {len(catalog)} sources.") else: # if no sources were detected, return an empty table self.log.warning("No sources detected, returning empty catalog.") catalog = Table( names=columns, dtype=(int, np.float64, np.float64, np.float64), ) if self.fit_psf and len(sources): # refine astrometry by fitting PSF models to the detected sources log.info("Constructing a gridded PSF model.") detector = input_model.meta.instrument["detector"].replace("WFI", "SCA") gridded_psf_model, _ = psf.create_gridded_psf_model( path_prefix="tmp", filt=filt, detector=detector, overwrite=True, logging_level="ERROR", ) log.info( "Fitting a PSF model to sources for improved astrometric precision." ) psf_photometry_table, photometry = psf.fit_psf_to_image_model( image_model=input_model, psf_model=gridded_psf_model, x_init=catalog["xcentroid"], y_init=catalog["ycentroid"], exclude_out_of_bounds=True, ) catalog_as_recarray = catalog.as_array() # create meta.source detection section in file # if save_catalogs is True, this will be updated with the # attribute 'tweakreg_catalog_name' to point to the location # of the catalog on disk. If save_catalogs is false, this section # will be updated to contain the catalog to pass to TweakReg # tweakreg_catalog_name will be saved to the final output file, # while tweakreg_catalog is intended to be deleted by TweakRegStep input_model.meta["source_detection"] = maker_utils.mk_source_detection() # if 'save_catalogs'= True, also save the output catalog to a file # (format specified by output_cat_filetype) and add an attribute # to the file that contains the path to this file if self.save_catalogs: cat_filename = input_model.meta.filename.replace(".asdf", "") cat_filename += f"_tweakreg_catalog.{self.output_cat_filetype}" log.info(f"Saving catalog to file: {cat_filename}.") if self.output_cat_filetype == "asdf": tree = {"tweakreg_catalog": catalog_as_recarray} ff = AsdfFile(tree) ff.write_to(cat_filename) else: catalog.write(cat_filename, format="ascii.ecsv", overwrite=True) input_model.meta.source_detection["tweakreg_catalog_name"] = ( cat_filename ) else: if self.fit_psf: # PSF photometry centroid results are stored in an astropy table # in columns "x_fit" and "y_fit", which we'll rename for # compatibility with tweakreg: catalog = psf_photometry_table.copy() # rename the PSF photometry table's columns to match the # expectated columns in tweakreg: for old_name, new_name in zip( ["x_fit", "y_fit"], ["xcentroid", "ycentroid"] ): catalog.rename_column(old_name, new_name) input_model.meta.source_detection["tweakreg_catalog"] = catalog else: # only attach catalog to file if its being passed to the next step # and save_catalogs is false, since it is not in the schema input_model.meta.source_detection["tweakreg_catalog"] = ( catalog_as_recarray ) input_model.meta.cal_step["source_detection"] = "COMPLETE" # just pass input model to next step - catalog is stored in meta return input_model
def _calc_2D_background(self): """Calculates a 2D background image. Calculates the background value for the input image in boxes specified by self.bkg_box_size. A mean, median, or mode estimator may be used (set by `bkg_estimator`). The pixels in each box will be sigma clipped, using a sigma specified by `bkg_sigma`.""" filter_size = ( self.bkg_filter_size, self.bkg_filter_size, ) # square size specified box_size = np.asarray(self.bkg_boxsize).astype(int) # must be integer if self.bkg_estimator == "median": bkg_estimator = MedianBackground() elif self.bkg_estimator == "mean": bkg_estimator = MeanBackground() elif self.bkg_estimator == "mode": bkg_estimator = ModeEstimatorBackground() else: raise ValueError("bkg_estimator must be one of 'mean', 'median', or 'mode'") sigma_clip = SigmaClip(self.bkg_sigma) try: bkg_2D = Background2D( self.data, box_size, filter_size=filter_size, coverage_mask=self.coverage_mask, sigma_clip=sigma_clip, bkg_estimator=bkg_estimator, ) except ValueError: # use the entire unmasked array log.info( "Background could not be estimated in meshes. " "Using the entire unmasked array for background " f"estimation: bkg_boxsize={self.data.shape}." ) bkg_2D = Background2D( self.data, self.data.shape, filter_size=filter_size, coverage_mask=self.coverage_mask, sigma_clip=sigma_clip, bkg_estimator=bkg_estimator, exclude_percentile=100.0, ) return bkg_2D