Source code for spectral_cube.lower_dimensional_structures

from __future__ import print_function, absolute_import, division

import warnings
from astropy import units as u
from astropy import wcs
from astropy import convolution
from astropy.io.fits import Header, Card, HDUList, PrimaryHDU
from .io.core import determine_format
from . import spectral_axis
from .utils import SliceWarning
from .cube_utils import convert_bunit

import numpy as np
from astropy import convolution

from .base_class import (BaseNDClass, SpectralAxisMixinClass,
                         SpatialCoordMixinClass)
from . import cube_utils

__all__ = ['LowerDimensionalObject', 'Projection', 'Slice', 'OneDSpectrum']


[docs]class LowerDimensionalObject(u.Quantity, BaseNDClass): """ Generic class for 1D and 2D objects. """ @property def header(self): header = self._header # This inplace update is OK; it's not bad to overwrite WCS in this # header if self.wcs is not None: header.update(self.wcs.to_header()) header['BUNIT'] = self.unit.to_string(format='fits') for keyword in header: if 'NAXIS' in keyword: del header[keyword] header.insert(2, Card(keyword='NAXIS', value=self.ndim)) for ind,sh in enumerate(self.shape[::-1]): header.insert(3+ind, Card(keyword='NAXIS{0:1d}'.format(ind+1), value=sh)) if 'beam' in self.meta: header.update(self.meta['beam'].to_header_keywords()) return header @property def hdu(self): if self.wcs is None: hdu = PrimaryHDU(self.value) else: hdu = PrimaryHDU(self.value, header=self.header) hdu.header['BUNIT'] = self.unit.to_string(format='fits') if 'beam' in self.meta: hdu.header.update(self.meta['beam'].to_header_keywords()) return hdu
[docs] def write(self, filename, format=None, overwrite=False): """ Write the lower dimensional object to a file. Parameters ---------- filename : str The path to write the file to format : str The kind of file to write. (Currently limited to 'fits') overwrite : bool If True, overwrite ``filename`` if it exists """ if format is None: format = determine_format(filename) if format == 'fits': self.hdu.writeto(filename, clobber=overwrite) else: raise ValueError("Unknown format '{0}' - the only available " "format at this time is 'fits'")
[docs] def to(self, unit, equivalencies=[]): """ Return a new `~spectral_cube.lower_dimensional_structures.LowerDimensionalObject` of the same class with the specified unit. See `astropy.units.Quantity.to` for further details. """ converted_array = u.Quantity.to(self, unit, equivalencies=equivalencies).value # use private versions of variables, not the generated property # versions # Not entirely sure the use of __class__ here is kosher, but we do want # self.__class__, not super() new = self.__class__(value=converted_array, unit=unit, copy=True, wcs=self._wcs, meta=self._meta, mask=self._mask, header=self._header) return new
def __getitem__(self, key, **kwargs): """ Return a new `~spectral_cube.lower_dimensional_structures.LowerDimensionalObject` of the same class while keeping other properties fixed. """ new_qty = super(LowerDimensionalObject, self).__getitem__(key) if new_qty.ndim < 2: # do not return a projection return u.Quantity(new_qty) if self._wcs is not None: if ((isinstance(key, tuple) and any(isinstance(k, slice) for k in key) and len(key) > self.ndim)): # Example cases include: indexing tricks like [:,:,None] warnings.warn("Slice {0} cannot be used on this {1}-dimensional" " array's WCS. If this is intentional, you " " should use this {2}'s ``array`` or ``quantity``" " attribute." .format(key, self.ndim, type(self)), SliceWarning ) return self.quantity[key] else: newwcs = self._wcs[key] else: newwcs = None new = self.__class__(value=new_qty.value, unit=new_qty.unit, copy=False, wcs=newwcs, meta=self._meta, mask=self._mask, header=self._header, **kwargs) return new def __array_finalize__(self, obj): self._wcs = getattr(obj, '_wcs', None) self._meta = getattr(obj, '_meta', None) self._mask = getattr(obj, '_mask', None) self._header = getattr(obj, '_header', None) self._spectral_unit = getattr(obj, '_spectral_unit', None) super(LowerDimensionalObject, self).__array_finalize__(obj) @property def __array_priority__(self): return super(LowerDimensionalObject, self).__array_priority__*2 @property def array(self): """ Get a pure array representation of the LDO. Useful when multiplying and using numpy indexing tricks. """ return np.asarray(self) @property def quantity(self): """ Get a pure `~astropy.units.Quantity` representation of the LDO. """ return u.Quantity(self)
[docs]class Projection(LowerDimensionalObject, SpatialCoordMixinClass): def __new__(cls, value, unit=None, dtype=None, copy=True, wcs=None, meta=None, mask=None, header=None, beam=None, read_beam=False): if np.asarray(value).ndim != 2: raise ValueError("value should be a 2-d array") if wcs is not None and wcs.wcs.naxis != 2: raise ValueError("wcs should have two dimension") self = u.Quantity.__new__(cls, value, unit=unit, dtype=dtype, copy=copy).view(cls) self._wcs = wcs self._meta = {} if meta is None else meta self._mask = mask if header is not None: self._header = header else: self._header = Header() if beam is not None: self._beam = beam self.meta['beam'] = beam else: if "beam" in self.meta: self._beam = self.meta['beam'] elif read_beam: beam = cube_utils.try_load_beam(header) if beam is not None: self._beam = beam self.meta['beam'] = beam else: warnings.warn("Cannot load beam from header.") return self @property def beam(self): return self._beam @staticmethod
[docs] def from_hdu(hdu): ''' Return a projection from a FITS HDU. ''' if not len(hdu.shape) == 2: raise ValueError("HDU must contain two-dimensional data.") meta = {} mywcs = wcs.WCS(hdu.header) if "BUNIT" in hdu.header: unit = convert_bunit(hdu.header["BUNIT"]) meta["BUNIT"] = hdu.header["BUNIT"] else: unit = None beam = cube_utils.try_load_beam(hdu.header) self = Projection(hdu.data, unit=unit, wcs=mywcs, meta=meta, header=hdu.header, beam=beam) return self
[docs] def quicklook(self, filename=None, use_aplpy=True, aplpy_kwargs={}): """ Use `APLpy <https://pypi.python.org/pypi/APLpy>`_ to make a quick-look image of the projection. This will make the ``FITSFigure`` attribute available. If there are unmatched celestial axes, this will instead show an image without axis labels. Parameters ---------- filename : str or Non Optional - the filename to save the quicklook to. """ if use_aplpy: try: if not hasattr(self, 'FITSFigure'): import aplpy self.FITSFigure = aplpy.FITSFigure(self.hdu, **aplpy_kwargs) self.FITSFigure.show_grayscale() self.FITSFigure.add_colorbar() if filename is not None: self.FITSFigure.save(filename) except (wcs.InconsistentAxisTypesError, ImportError): self._quicklook_mpl(filename=filename) else: self._quicklook_mpl(filename=filename)
def _quicklook_mpl(self, filename=None): from matplotlib import pyplot self.figure = pyplot.imshow(self.value) if filename is not None: self.figure.savefig(filename)
[docs] def convolve_to(self, beam, convolve=convolution.convolve_fft): """ Convolve the image to a specified beam. Parameters ---------- beam : `radio_beam.Beam` The beam to convolve to convolve : function The astropy convolution function to use, either `astropy.convolution.convolve` or `astropy.convolution.convolve_fft` Returns ------- proj : `Projection` A Projection convolved to the given ``beam`` object. """ self._raise_wcs_no_celestial() if not hasattr(self, 'beam'): raise ValueError("No beam is contained in Projection.meta.") pixscale = wcs.utils.proj_plane_pixel_area(self.wcs.celestial)**0.5 * u.deg convolution_kernel = \ beam.deconvolve(self.beam).as_kernel(pixscale) newdata = convolve(self.value, convolution_kernel, normalize_kernel=True) self = Projection(newdata, unit=self.unit, wcs=self.wcs, meta=self.meta, header=self.header, beam=beam) return self
[docs] def reproject(self, header, order='bilinear'): """ Reproject the image into a new header. Parameters ---------- header : `astropy.io.fits.Header` A header specifying a cube in valid WCS order : int or str, optional The order of the interpolation (if ``mode`` is set to ``'interpolation'``). This can be either one of the following strings: * 'nearest-neighbor' * 'bilinear' * 'biquadratic' * 'bicubic' or an integer. A value of ``0`` indicates nearest neighbor interpolation. """ self._raise_wcs_no_celestial() try: from reproject.version import version except ImportError: raise ImportError("Requires the reproject package to be" " installed.") # Need version > 0.2 to work with cubes from distutils.version import LooseVersion if LooseVersion(version) < "0.3": raise Warning("Requires version >=0.3 of reproject. The current " "version is: {}".format(version)) from reproject import reproject_interp # TODO: Find the minimal footprint that contains the header and only reproject that # (see FITS_tools.regrid_cube for a guide on how to do this) newwcs = wcs.WCS(header) shape_out = [header['NAXIS{0}'.format(i + 1)] for i in range(header['NAXIS'])][::-1] newproj, newproj_valid = reproject_interp((self.value, self.header), newwcs, shape_out=shape_out, order=order) self = Projection(newproj, unit=self.unit, wcs=newwcs, meta=self.meta, header=header, read_beam=True) return self
[docs] def subimage(self, xlo='min', xhi='max', ylo='min', yhi='max'): """ Extract a region spatially. Parameters ---------- [xy]lo/[xy]hi : int or `astropy.units.Quantity` or `min`/`max` The endpoints to extract. If given as a quantity, will be interpreted as World coordinates. If given as a string or int, will be interpreted as pixel coordinates. """ self._raise_wcs_no_celestial() limit_dict = {'xlo': 0 if xlo == 'min' else xlo, 'ylo': 0 if ylo == 'min' else ylo, 'xhi': self.shape[1] if xhi == 'max' else xhi, 'yhi': self.shape[0] if yhi == 'max' else yhi} dims = {'x': 1, 'y': 0} for val in (xlo, ylo, xhi, yhi): if hasattr(val, 'unit') and not val.unit.is_equivalent(u.degree): raise u.UnitsError("The X and Y slices must be specified in " "degree-equivalent units.") for lim in limit_dict: limval = limit_dict[lim] if hasattr(limval, 'unit'): dim = dims[lim[0]] sl = [slice(0, 1)] sl.insert(dim, slice(None)) spine = self.world[sl][dim] val = np.argmin(np.abs(limval - spine)) if limval > spine.max() or limval < spine.min(): pass # log.warn("The limit {0} is out of bounds." # " Using min/max instead.".format(lim)) if lim[1:] == 'hi': # End-inclusive indexing: need to add one for the high # slice limit_dict[lim] = val + 1 else: limit_dict[lim] = val slices = [slice(limit_dict[xx + 'lo'], limit_dict[xx + 'hi']) for xx in 'yx'] return self[slices]
# A slice is just like a projection in every way
[docs]class Slice(Projection): pass
[docs]class OneDSpectrum(LowerDimensionalObject,SpectralAxisMixinClass): def __new__(cls, value, unit=None, dtype=None, copy=True, wcs=None, meta=None, mask=None, header=None, spectral_unit=None, beams=None): if np.asarray(value).ndim != 1: raise ValueError("value should be a 1-d array") if wcs is not None and wcs.wcs.naxis != 1: raise ValueError("wcs should have two dimension") self = u.Quantity.__new__(cls, value, unit=unit, dtype=dtype, copy=copy).view(cls) self._wcs = wcs self._meta = {} if meta is None else meta self._mask = mask if header is not None: self._header = header else: self._header = Header() self._spectral_unit = spectral_unit if spectral_unit is None: if 'CUNIT1' in self._header: self._spectral_unit = u.Unit(self._header['CUNIT1']) elif self._wcs is not None: self._spectral_unit = u.Unit(self._wcs.wcs.cunit[0]) if beams is not None: self.beams = beams return self @property def header(self): header = self._header # This inplace update is OK; it's not bad to overwrite WCS in this # header if self.wcs is not None: header.update(self.wcs.to_header()) header['BUNIT'] = self.unit.to_string(format='fits') header.insert(2, Card(keyword='NAXIS', value=self.ndim)) for ind,sh in enumerate(self.shape[::-1]): header.insert(3+ind, Card(keyword='NAXIS{0:1d}'.format(ind+1), value=sh)) # Preserve the spectrum's spectral units if 'CUNIT1' in header and self._spectral_unit != u.Unit(header['CUNIT1']): spectral_scale = spectral_axis.wcs_unit_scale(self._spectral_unit) header['CDELT1'] *= spectral_scale header['CRVAL1'] *= spectral_scale header['CUNIT1'] = self.unit.to_string(format='FITS') return header @property def spectral_axis(self): """ A `~astropy.units.Quantity` array containing the central values of each channel along the spectral axis. """ if self._wcs is None: spec_axis = np.arange(self.size) * u.dimensionless_unscaled else: spec_axis = self.wcs.wcs_pix2world(np.arange(self.size), 0)[0] * \ u.Unit(self.wcs.wcs.cunit[0]) if self._spectral_unit is not None: spec_axis = spec_axis.to(self._spectral_unit) return spec_axis
[docs] def quicklook(self, filename=None, drawstyle='steps-mid', **kwargs): """ Plot the spectrum with current spectral units in the currently open figure kwargs are passed to `matplotlib.pyplot.plot` Parameters ---------- filename : str or Non Optional - the filename to save the quicklook to. """ from matplotlib import pyplot ax = pyplot.gca() ax.plot(self.spectral_axis, self.value, drawstyle=drawstyle, **kwargs) ax.set_xlabel(self.wcs.wcs.cunit[0]) ax.set_ylabel(self.unit) if filename is not None: pyplot.gcf().savefig(filename)
[docs] def with_spectral_unit(self, unit, velocity_convention=None, rest_value=None): newwcs, newmeta = self._new_spectral_wcs(unit, velocity_convention=velocity_convention, rest_value=rest_value) newheader = self._nowcs_header.copy() newheader.update(newwcs.to_header()) wcs_cunit = u.Unit(newheader['CUNIT1']) newheader['CUNIT1'] = unit.to_string(format='FITS') newheader['CDELT1'] *= wcs_cunit.to(unit) return OneDSpectrum(value=self.value, unit=self.unit, wcs=newwcs, header=newheader, meta=newmeta, copy=False, spectral_unit=unit)
def __getitem__(self, key, **kwargs): try: beams = self.beams[key] except (AttributeError,TypeError): beams = None return super(OneDSpectrum, self).__getitem__(key, beams=beams) @property def hdu(self): if hasattr(self, 'beams'): warnings.warn("There are multiple beams for this spectrum that " "are being ignored when creating the HDU.") return super(OneDSpectrum, self).hdu @property def hdulist(self): with warnings.catch_warnings(): warnings.simplefilter("ignore") hdu = self.hdu beamhdu = cube_utils.beams_to_bintable(self.beams) return HDUList([hdu, beamhdu])
[docs] def spectral_interpolate(self, spectral_grid, suppress_smooth_warning=False): """ Resample the spectrum onto a specific grid Parameters ---------- spectral_grid : array An array of the spectral positions to regrid onto suppress_smooth_warning : bool If disabled, a warning will be raised when interpolating onto a grid that does not nyquist sample the existing grid. Disable this if you have already appropriately smoothed the data. Returns ------- cube : SpectralCube """ assert spectral_grid.ndim == 1 inaxis = self.spectral_axis.to(spectral_grid.unit) indiff = np.mean(np.diff(inaxis)) outdiff = np.mean(np.diff(spectral_grid)) # account for reversed axes if outdiff < 0: spectral_grid = spectral_grid[::-1] outdiff = np.mean(np.diff(spectral_grid)) specslice = slice(None) if indiff >= 0 else slice(None, None, -1) inaxis = inaxis[specslice] indiff = np.mean(np.diff(inaxis)) # insanity checks if indiff < 0 or outdiff < 0: raise ValueError("impossible.") assert np.all(np.diff(spectral_grid) > 0) assert np.all(np.diff(inaxis) > 0) np.testing.assert_allclose(np.diff(spectral_grid), outdiff, err_msg="Output grid must be linear") if outdiff > 2 * indiff and not suppress_smooth_warning: warnings.warn("Input grid has too small a spacing. The data should " "be smoothed prior to resampling.") newspec = np.empty([spectral_grid.size], dtype=self.dtype) #newmask = np.empty([spectral_grid.size], dtype='bool') # TODO: handle masks newspec = np.interp(spectral_grid.value, inaxis.value, self.value) newwcs = self.wcs.deepcopy() newwcs.wcs.crpix[0] = 1 newwcs.wcs.crval[0] = spectral_grid[0].value newwcs.wcs.cunit[0] = spectral_grid.unit.to_string('FITS') newwcs.wcs.cdelt[0] = outdiff.value newwcs.wcs.set() newheader = self._nowcs_header.copy() newheader.update(newwcs.to_header()) wcs_cunit = u.Unit(newheader['CUNIT1']) newheader['CUNIT1'] = spectral_grid.unit.to_string(format='FITS') newheader['CDELT1'] *= wcs_cunit.to(spectral_grid.unit) return OneDSpectrum(value=newspec, unit=self.unit, wcs=newwcs, header=newheader, meta=self.meta.copy(), copy=False, spectral_unit=spectral_grid.unit)
[docs] def spectral_smooth(self, kernel, convolve=convolution.convolve, **kwargs): """ Smooth the spectrum Parameters ---------- kernel : `~astropy.convolution.Kernel1D` A 1D kernel from astropy convolve : function The astropy convolution function to use, either `astropy.convolution.convolve` or `astropy.convolution.convolve_fft` kwargs : dict Passed to the convolve function """ newspec = convolve(self.value, kernel, normalize_kernel=True, **kwargs) return OneDSpectrum(value=newspec, unit=self.unit, wcs=self.wcs.copy(), header=self.header.copy(), meta=self.meta.copy(), copy=False, spectral_unit=self.spectral_axis.unit)