# -*- coding: utf-8 -*-
# Copyright (C) 2016-2019 by Brendt Wohlberg <brendt@ieee.org>
# Cristina Garcia-Cardona <cgarciac@lanl.gov>
# All rights reserved. BSD 3-clause License.
# This file is part of the SPORCO package. Details of the copyright
# and user license can be found in the 'LICENSE.txt' file distributed
# with the package.
"""FISTA algorithms for the CCMOD problem"""
from __future__ import division, absolute_import
import copy
import numpy as np
from sporco.fista import fista
from sporco.array import atleast_nd
from sporco.fft import (rfftn, irfftn, empty_aligned, rfftn_empty_aligned,
rfl2norm2)
from sporco.linalg import inner
from sporco.cnvrep import CDU_ConvRepIndexing, getPcn, bcrop
__author__ = """Cristina Garcia-Cardona <cgarciac@lanl.gov>"""
[docs]class ConvCnstrMOD(fista.FISTADFT):
r"""
Base class for FISTA algorithm for Convolutional Constrained MOD
problem :cite:`garcia-2018-convolutional1`.
|
.. inheritance-diagram:: ConvCnstrMOD
:parts: 2
|
Solve the optimisation problem
.. math::
\mathrm{argmin}_\mathbf{d} \;
(1/2) \sum_k \left\| \sum_m \mathbf{d}_m * \mathbf{x}_{k,m} -
\mathbf{s}_k \right\|_2^2 \quad \text{such that} \quad
\mathbf{d}_m \in C
via the FISTA problem
.. math::
\mathrm{argmin}_\mathbf{d} \;
(1/2) \sum_k \left\| \sum_m \mathbf{d}_m * \mathbf{x}_{k,m} -
\mathbf{s}_k \right\|_2^2 + \sum_m \iota_C(\mathbf{d}_m) \;\;,
where :math:`\iota_C(\cdot)` is the indicator function of feasible
set :math:`C` consisting of filters with unit norm and constrained
support. Multi-channel problems with input image channels
:math:`\mathbf{s}_{c,k}` are also supported, either as
.. math::
\mathrm{argmin}_\mathbf{d} \; (1/2) \sum_c \sum_k \left\| \sum_m
\mathbf{d}_m * \mathbf{x}_{c,k,m} - \mathbf{s}_{c,k} \right\|_2^2
\quad \text{such that} \quad \mathbf{d}_m \in C
with single-channel dictionary filters :math:`\mathbf{d}_m` and
multi-channel coefficient maps :math:`\mathbf{x}_{c,k,m}`, or
.. math::
\mathrm{argmin}_\mathbf{d} \; (1/2) \sum_c \sum_k \left\| \sum_m
\mathbf{d}_{c,m} * \mathbf{x}_{k,m} - \mathbf{s}_{c,k}
\right\|_2^2 \quad \text{such that} \quad \mathbf{d}_{c,m} \in C
with multi-channel dictionary filters :math:`\mathbf{d}_{c,m}` and
single-channel coefficient maps :math:`\mathbf{x}_{k,m}`. In this
latter case, normalisation of filters :math:`\mathbf{d}_{c,m}` is
performed jointly over index :math:`c` for each filter :math:`m`.
After termination of the :meth:`solve` method, attribute :attr:`itstat`
is a list of tuples representing statistics of each iteration. The
fields of the named tuple ``IterationStats`` are:
``Iter`` : Iteration number
``DFid`` : Value of data fidelity term :math:`(1/2) \sum_k \|
\sum_m \mathbf{d}_m * \mathbf{x}_{k,m} - \mathbf{s}_k \|_2^2`
``Cnstr`` : Constraint violation measure
``Rsdl`` : Residual
``L`` : Inverse of gradient step parameter
``Time`` : Cumulative run time
"""
[docs] class Options(fista.FISTADFT.Options):
r"""ConvCnstrMOD algorithm options
Options include all of those defined in
:class:`.fista.FISTADFT.Options`, together with
additional options:
``ZeroMean`` : Flag indicating whether the solution
dictionary :math:`\{\mathbf{d}_m\}` should have zero-mean
components.
"""
defaults = copy.deepcopy(fista.FISTADFT.Options.defaults)
defaults.update({'ZeroMean': False})
def __init__(self, opt=None):
"""
Parameters
----------
opt : dict or None, optional (default None)
ConvCnstrMOD algorithm options
"""
if opt is None:
opt = {}
fista.FISTADFT.Options.__init__(self, opt)
def __setitem__(self, key, value):
"""Set options."""
fista.FISTADFT.Options.__setitem__(self, key, value)
itstat_fields_objfn = ('DFid', 'Cnstr')
hdrtxt_objfn = ('DFid', 'Cnstr')
hdrval_objfun = {'DFid': 'DFid', 'Cnstr': 'Cnstr'}
def __init__(self, Z, S, dsz, opt=None, dimK=1, dimN=2):
"""
This class supports an arbitrary number of spatial dimensions,
`dimN`, with a default of 2. The input coefficient map array `Z`
(usually labelled X, but renamed here to avoid confusion with
the X and Y variables in the FISTA base class) is expected to
be in standard form as computed by the GenericConvBPDN class.
The input signal set `S` is either `dimN` dimensional (no
channels, only one signal), `dimN` +1 dimensional (either
multiple channels or multiple signals), or `dimN` +2 dimensional
(multiple channels and multiple signals). Parameter `dimK`, with
a default value of 1, indicates the number of multiple-signal
dimensions in `S`:
::
Default dimK = 1, i.e. assume input S is of form
S(N0, N1, C, K) or S(N0, N1, K)
If dimK = 0 then input S is of form
S(N0, N1, C, K) or S(N0, N1, C)
The internal data layout for S, D (X here), and X (Z here) is:
::
dim<0> - dim<Nds-1> : Spatial dimensions, product of N0,N1,... is N
dim<Nds> : C number of channels in S and D
dim<Nds+1> : K number of signals in S
dim<Nds+2> : M number of filters in D
sptl. chn sig flt
S(N0, N1, C, K, 1)
D(N0, N1, C, 1, M) (X here)
X(N0, N1, 1, K, M) (Z here)
The `dsz` parameter indicates the desired filter supports in the
output dictionary, since this cannot be inferred from the
input variables. The format is the same as the `dsz` parameter
of :func:`.cnvrep.bcrop`.
Parameters
----------
Z : array_like
Coefficient map array
S : array_like
Signal array
dsz : tuple
Filter support size(s)
opt : ccmod.Options object
Algorithm options
dimK : int, optional (default 1)
Number of dimensions for multiple signals in input S
dimN : int, optional (default 2)
Number of spatial dimensions
"""
# Set default options if none specified
if opt is None:
opt = ConvCnstrMOD.Options()
# Infer problem dimensions and set relevant attributes of self
self.cri = CDU_ConvRepIndexing(dsz, S, dimK=dimK, dimN=dimN)
# Call parent class __init__
xshape = self.cri.shpD
super(ConvCnstrMOD, self).__init__(xshape, S.dtype, opt)
# Set gradient step parameter
self.set_attr('L', opt['L'], dval=self.cri.K * 14.0, dtype=self.dtype)
# Reshape S to standard layout (Z, i.e. X in cbpdn, is assumed
# to be taken from cbpdn, and therefore already in standard
# form). If the dictionary has a single channel but the input
# (and therefore also the coefficient map array) has multiple
# channels, the channel index and multiple image index have
# the same behaviour in the dictionary update equation: the
# simplest way to handle this is to just reshape so that the
# channels also appear on the multiple image index.
if self.cri.Cd == 1 and self.cri.C > 1:
self.S = S.reshape(self.cri.Nv + (1,) +
(self.cri.C * self.cri.K,) + (1,))
else:
self.S = S.reshape(self.cri.shpS)
self.S = np.asarray(self.S, dtype=self.dtype)
# Compute signal S in DFT domain
self.Sf = rfftn(self.S, None, self.cri.axisN)
# Create constraint set projection function
self.Pcn = getPcn(dsz, self.cri.Nv, self.cri.dimN, self.cri.dimCd,
zm=opt['ZeroMean'])
# Create byte aligned arrays for FFT calls
self.Y = self.X
self.X = empty_aligned(self.Y.shape, dtype=self.dtype)
self.X[:] = self.Y
# Initialise auxiliary variable Vf: Create byte aligned arrays
# for FFT calls
self.Vf = rfftn_empty_aligned(self.X.shape, self.cri.axisN,
self.dtype)
self.Xf = rfftn(self.X, None, self.cri.axisN)
self.Yf = self.Xf
self.store_prev()
self.Yfprv = self.Yf.copy() + 1e5
# Initialization needed for back tracking (if selected)
self.postinitialization_backtracking_DFT()
if Z is not None:
self.setcoef(Z)
[docs] def setcoef(self, Z):
"""Set coefficient array."""
# If the dictionary has a single channel but the input (and
# therefore also the coefficient map array) has multiple
# channels, the channel index and multiple image index have
# the same behaviour in the dictionary update equation: the
# simplest way to handle this is to just reshape so that the
# channels also appear on the multiple image index.
if self.cri.Cd == 1 and self.cri.C > 1:
Z = Z.reshape(self.cri.Nv + (1,) + (self.cri.Cx * self.cri.K,) +
(self.cri.M,))
self.Z = np.asarray(Z, dtype=self.dtype)
self.Zf = rfftn(self.Z, self.cri.Nv, self.cri.axisN)
[docs] def getdict(self, crop=True):
"""Get final dictionary. If ``crop`` is ``True``, apply
:func:`.cnvrep.bcrop` to returned array.
"""
D = self.X
if crop:
D = bcrop(D, self.cri.dsz, self.cri.dimN)
return D
[docs] def eval_grad(self):
"""Compute gradient in Fourier domain."""
# Compute X D - S
Ryf = self.eval_Rf(self.Yf)
gradf = inner(np.conj(self.Zf), Ryf, axis=self.cri.axisK)
# Multiple channel signal, single channel dictionary
if self.cri.C > 1 and self.cri.Cd == 1:
gradf = np.sum(gradf, axis=self.cri.axisC, keepdims=True)
return gradf
[docs] def eval_Rf(self, Vf):
"""Evaluate smooth term in Vf."""
return inner(self.Zf, Vf, axis=self.cri.axisM) - self.Sf
[docs] def eval_proxop(self, V):
"""Compute proximal operator of :math:`g`."""
return self.Pcn(V)
[docs] def rsdl(self):
"""Compute fixed point residual in Fourier domain."""
diff = self.Xf - self.Yfprv
return rfl2norm2(diff, self.X.shape, axis=self.cri.axisN)
[docs] def eval_objfn(self):
"""Compute components of objective function as well as total
contribution to objective function.
"""
dfd = self.obfn_dfd()
cns = self.obfn_cns()
return (dfd, cns)
[docs] def obfn_dfd(self):
r"""Compute data fidelity term :math:`(1/2) \| \sum_m
\mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2`.
"""
Ef = self.eval_Rf(self.Xf)
return rfl2norm2(Ef, self.S.shape, axis=self.cri.axisN) / 2.0
[docs] def obfn_cns(self):
r"""Compute constraint violation measure :math:`\|
P(\mathbf{y}) - \mathbf{y}\|_2`.
"""
return np.linalg.norm((self.Pcn(self.X) - self.X))
[docs] def obfn_f(self, Xf=None):
r"""Compute data fidelity term :math:`(1/2) \| \sum_m
\mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2`.
This is used for backtracking. Since the backtracking is
computed in the DFT, it is important to preserve the
DFT scaling.
"""
if Xf is None:
Xf = self.Xf
Rf = self.eval_Rf(Xf)
return 0.5 * np.linalg.norm(Rf.flatten(), 2)**2
[docs] def reconstruct(self, D=None):
"""Reconstruct representation."""
if D is None:
Df = self.Xf
else:
Df = rfftn(D, None, self.cri.axisN)
Sf = np.sum(self.Zf * Df, axis=self.cri.axisM)
return irfftn(Sf, self.cri.Nv, self.cri.axisN)
[docs]class ConvCnstrMODMask(ConvCnstrMOD):
r"""
FISTA algorithm for Convolutional Constrained MOD problem
with a spatial mask :cite:`garcia-2018-convolutional1`.
|
.. inheritance-diagram:: ConvCnstrMODMask
:parts: 2
|
Solve the optimisation problem
.. math::
\mathrm{argmin}_\mathbf{d} \;
(1/2) \left\| W \left(\sum_m \mathbf{d}_m * \mathbf{x}_m -
\mathbf{s}\right) \right\|_2^2 \quad \text{such that} \quad
\mathbf{d}_m \in C \;\; \forall m
where :math:`C` is the feasible set consisting of filters with unit
norm and constrained support, and :math:`W` is a mask array, via the
FISTA problem
.. math::
\mathrm{argmin}_{\mathbf{d}} \; (1/2) \left\| W \left(X
\mathbf{d} - \mathbf{s}\right) \right\|_2^2 +
\iota_C(\mathbf{d}_m) \;\;,
where :math:`\iota_C(\cdot)` is the indicator function of feasible
set :math:`C`, and :math:`X \mathbf{d} = \sum_m \mathbf{x}_m *
\mathbf{d}_m`.
See :class:`ConvCnstrMOD` for interface details.
"""
[docs] class Options(ConvCnstrMOD.Options):
"""ConvCnstrMODMask algorithm options
Options include all of those defined in
:class:`.fista.FISTA.Options`.
"""
defaults = copy.deepcopy(ConvCnstrMOD.Options.defaults)
def __init__(self, opt=None):
"""
Parameters
----------
opt : dict or None, optional (default None)
ConvCnstrMODMask algorithm options
"""
if opt is None:
opt = {}
ConvCnstrMOD.Options.__init__(self, opt)
def __init__(self, Z, S, W, dsz, opt=None, dimK=None, dimN=2):
"""
Parameters
----------
Z : array_like
Coefficient map array
S : array_like
Signal array
W : array_like
Mask array. The array shape must be such that the array is
compatible for multiplication with the *internal* shape of
input array S (see :class:`.cnvrep.CDU_ConvRepIndexing` for a
discussion of the distinction between *external* and
*internal* data layouts).
dsz : tuple
Filter support size(s)
opt : :class:`ConvCnstrMODMask.Options` object
Algorithm options
dimK : 0, 1, or None, optional (default None)
Number of dimensions in input signal corresponding to multiple
independent signals
dimN : int, optional (default 2)
Number of spatial dimensions
"""
# Set default options if none specified
if opt is None:
opt = ConvCnstrMODMask.Options()
# Infer problem dimensions and set relevant attributes of self
self.cri = CDU_ConvRepIndexing(dsz, S, dimK=dimK, dimN=dimN)
# Append singleton dimensions to W if necessary
if hasattr(W, 'ndim'):
W = atleast_nd(self.cri.dimN + 3, W)
# Reshape W if necessary (see discussion of reshape of S in
# ccmod base class)
if self.cri.Cd == 1 and self.cri.C > 1 and hasattr(W, 'ndim'):
# In most cases broadcasting rules make it possible for W
# to have a singleton dimension corresponding to a
# non-singleton dimension in S. However, when S is
# reshaped to interleave axisC and axisK on the same axis,
# broadcasting is no longer sufficient unless axisC and
# axisK of W are either both singleton or both of the same
# size as the corresponding axes of S. If neither of these
# cases holds, it is necessary to replicate the axis of W
# (axisC or axisK) that does not have the same size as the
# corresponding axis of S.
shpw = list(W.shape)
swck = shpw[self.cri.axisC] * shpw[self.cri.axisK]
if swck > 1 and swck < self.cri.C * self.cri.K:
if W.shape[self.cri.axisK] == 1 and self.cri.K > 1:
shpw[self.cri.axisK] = self.cri.K
else:
shpw[self.cri.axisC] = self.cri.C
W = np.broadcast_to(W, shpw)
self.W = W.reshape(
W.shape[0:self.cri.dimN] +
(1, W.shape[self.cri.axisC] * W.shape[self.cri.axisK], 1))
else:
self.W = W
super(ConvCnstrMODMask, self).__init__(Z, S, dsz, opt, dimK, dimN)
# Create byte aligned arrays for FFT calls
self.WRy = empty_aligned(self.S.shape, dtype=self.dtype)
self.Ryf = rfftn_empty_aligned(self.S.shape, self.cri.axisN,
self.dtype)
[docs] def eval_grad(self):
"""Compute gradient in Fourier domain."""
# Compute X D - S
self.Ryf[:] = self.eval_Rf(self.Yf)
# Map to spatial domain to multiply by mask
Ry = irfftn(self.Ryf, self.cri.Nv, self.cri.axisN)
# Multiply by mask
self.WRy[:] = (self.W**2) * Ry
# Map back to frequency domain
WRyf = rfftn(self.WRy, self.cri.Nv, self.cri.axisN)
gradf = inner(np.conj(self.Zf), WRyf, axis=self.cri.axisK)
# Multiple channel signal, single channel dictionary
if self.cri.C > 1 and self.cri.Cd == 1:
gradf = np.sum(gradf, axis=self.cri.axisC, keepdims=True)
return gradf
[docs] def obfn_dfd(self):
r"""Compute data fidelity term :math:`(1/2) \sum_k \| W (\sum_m
\mathbf{d}_m * \mathbf{x}_{k,m} - \mathbf{s}_k) \|_2^2`
"""
Ef = self.eval_Rf(self.Xf)
E = irfftn(Ef, self.cri.Nv, self.cri.axisN)
return (np.linalg.norm(self.W * E)**2) / 2.0
[docs] def obfn_f(self, Xf=None):
r"""Compute data fidelity term :math:`(1/2) \sum_k \| W (\sum_m
\mathbf{d}_m * \mathbf{x}_{k,m} - \mathbf{s}_k) \|_2^2`.
This is used for backtracking. Since the backtracking is
computed in the DFT, it is important to preserve the
DFT scaling.
"""
if Xf is None:
Xf = self.Xf
Rf = self.eval_Rf(Xf)
R = irfftn(Rf, self.cri.Nv, self.cri.axisN)
WRf = rfftn(self.W * R, self.cri.Nv, self.cri.axisN)
return 0.5 * np.linalg.norm(WRf.flatten(), 2)**2