Source code for spectral_cube.base_class

from astropy import units as u
from astropy import log
import numpy as np

from . import wcs_utils
from . import cube_utils
from .utils import cached, WCSCelestialError

__doctest_skip__ = ['SpatialCoordMixinClass.world']

DOPPLER_CONVENTIONS = {}
DOPPLER_CONVENTIONS['radio'] = u.doppler_radio
DOPPLER_CONVENTIONS['optical'] = u.doppler_optical
DOPPLER_CONVENTIONS['relativistic'] = u.doppler_relativistic


class BaseNDClass(object):

    _cache = {}

    @property
    def _nowcs_header(self):
        """
        Return a copy of the header with no WCS information attached
        """
        log.debug("Stripping WCS from header")
        return wcs_utils.strip_wcs_from_header(self._header)

    @property
    def wcs(self):
        return self._wcs

    @property
    def meta(self):
        return self._meta

    @property
    def mask(self):
        return self._mask


class SpatialCoordMixinClass(object):

    @property
    def _has_wcs_celestial(self):
        return self.wcs.has_celestial

    def _raise_wcs_no_celestial(self):
        if not self._has_wcs_celestial:
            raise WCSCelestialError("WCS does not contain two spatial axes.")

    @cube_utils.slice_syntax
    def world(self, view):
        """
        Return a list of the world coordinates in a cube, projection, or a view
        of it.

        SpatialCoordMixinClass.world is called with *bracket notation*, like
        a NumPy array::

            c.world[0:3, :, :]

        Returns
        -------
        [v, y, x] : list of NumPy arrays
            The 3 world coordinates at each pixel in the view. For a 2D image,
            the output is ``[y, x]``.


        Examples
        --------
        Extract the first 3 velocity channels of the cube:

        >>> v, y, x = c.world[0:3]

        Extract all the world coordinates:

        >>> v, y, x = c.world[:, :, :]

        Extract every other pixel along all axes:

        >>> v, y, x = c.world[::2, ::2, ::2]

        Extract all the world coordinates for a 2D image:

        >>> y, x = c.world[:, :]

        """

        self._raise_wcs_no_celestial()

        # note: view is a tuple of view

        # the next 3 lines are equivalent to (but more efficient than)
        # inds = np.indices(self._data.shape)
        # inds = [i[view] for i in inds]
        inds = np.ogrid[[slice(0, s) for s in self.shape]]
        inds = np.broadcast_arrays(*inds)
        inds = [i[view] for i in inds[::-1]]  # numpy -> wcs order

        shp = inds[0].shape
        inds = np.column_stack([i.ravel() for i in inds])
        world = self._wcs.all_pix2world(inds, 0).T

        world = [w.reshape(shp) for w in world]  # 1D->3D

        # apply units
        world = [w * u.Unit(self._wcs.wcs.cunit[i])
                 for i, w in enumerate(world)]

        # convert spectral unit if needed
        if hasattr(self, "_spectral_unit"):
            if self._spectral_unit is not None:
                specind = self.wcs.wcs.spec
                world[specind] = world[specind].to(self._spectral_unit)

        return world[::-1]  # reverse WCS -> numpy order

    def world_spines(self):
        """
        Returns a list of 1D arrays, for the world coordinates
        along each pixel axis.

        Raises error if this operation is ill-posed (e.g. rotated world
        coordinates, strong distortions)

        This method is not currently implemented. Use :meth:`world` instead.
        """
        raise NotImplementedError()

    @property
    def spatial_coordinate_map(self):
        view = [0 for ii in range(self.ndim - 2)] + [slice(None)] * 2
        return self.world[view][self.ndim - 2:]

    @property
    @cached
    def world_extrema(self):
        lat, lon = self.spatial_coordinate_map
        _lon_min = lon.min()
        _lon_max = lon.max()
        _lat_min = lat.min()
        _lat_max = lat.max()

        return ((_lon_min, _lon_max),
                (_lat_min, _lat_max))

    @property
    @cached
    def longitude_extrema(self):
        return self.world_extrema[0]

    @property
    @cached
    def latitude_extrema(self):
        return self.world_extrema[1]


class SpectralAxisMixinClass(object):

    def _new_spectral_wcs(self, unit, velocity_convention=None,
                          rest_value=None):
        """
        Returns a new WCS with a different Spectral Axis unit

        Parameters
        ----------
        unit : :class:`~astropy.units.Unit`
            Any valid spectral unit: velocity, (wave)length, or frequency.
            Only vacuum units are supported.
        velocity_convention : 'relativistic', 'radio', or 'optical'
            The velocity convention to use for the output velocity axis.
            Required if the output type is velocity. This can be either one
            of the above strings, or an `astropy.units` equivalency.
        rest_value : :class:`~astropy.units.Quantity`
            A rest wavelength or frequency with appropriate units.  Required if
            output type is velocity.  The cube's WCS should include this
            already if the *input* type is velocity, but the WCS's rest
            wavelength/frequency can be overridden with this parameter.

            .. note: This must be the rest frequency/wavelength *in vacuum*,
                     even if your cube has air wavelength units

        """
        from .spectral_axis import (convert_spectral_axis,
                                    determine_ctype_from_vconv)

        # Allow string specification of units, for example
        if not isinstance(unit, u.Unit):
            unit = u.Unit(unit)

        # Velocity conventions: required for frq <-> velo
        # convert_spectral_axis will handle the case of no velocity
        # convention specified & one is required
        if velocity_convention in DOPPLER_CONVENTIONS:
            velocity_convention = DOPPLER_CONVENTIONS[velocity_convention]
        elif (velocity_convention is not None and
              velocity_convention not in DOPPLER_CONVENTIONS.values()):
            raise ValueError("Velocity convention must be radio, optical, "
                             "or relativistic.")

        # If rest value is specified, it must be a quantity
        if (rest_value is not None and
            (not hasattr(rest_value, 'unit') or
             not rest_value.unit.is_equivalent(u.m, u.spectral()))):
            raise ValueError("Rest value must be specified as an astropy "
                             "quantity with spectral equivalence.")

        # Shorter versions to keep lines under 80
        ctype_from_vconv = determine_ctype_from_vconv

        meta = self._meta.copy()
        if 'Original Unit' not in self._meta:
            meta['Original Unit'] = self._wcs.wcs.cunit[self._wcs.wcs.spec]
            meta['Original Type'] = self._wcs.wcs.ctype[self._wcs.wcs.spec]

        out_ctype = ctype_from_vconv(self._wcs.wcs.ctype[self._wcs.wcs.spec],
                                     unit,
                                     velocity_convention=velocity_convention)

        newwcs = convert_spectral_axis(self._wcs, unit, out_ctype,
                                       rest_value=rest_value)

        newwcs.wcs.set()
        return newwcs, meta

    @property
    def spectral_axis(self):
        # spectral objects should be forced to implement this
        raise NotImplementedError