# -*- coding: utf-8 -*-
# Copyright (C) 2016-2019 by Brendt Wohlberg <brendt@ieee.org>
# All rights reserved. BSD 3-clause License.
# This file is part of the SPORCO package. Details of the copyright
# and user license can be found in the 'LICENSE.txt' file distributed
# with the package.
"""Classes for ADMM algorithms for convolutional sparse coding with
Total Variation regularisation terms"""
from __future__ import division, print_function
from builtins import range
import copy
import numpy as np
from sporco.admm import admm
import sporco.cnvrep as cr
from sporco.admm import cbpdn
import sporco.linalg as sl
import sporco.prox as sp
from sporco.util import u
from sporco.fft import (rfftn, irfftn, empty_aligned, rfftn_empty_aligned,
rfl2norm2)
from sporco.signal import gradient_filters
__author__ = """Brendt Wohlberg <brendt@ieee.org>"""
[docs]
class ConvBPDNScalarTV(admm.ADMM):
r"""
ADMM algorithm for an extension of Convolutional BPDN including
terms penalising the total variation of each coefficient map
:cite:`wohlberg-2017-convolutional`.
|
.. inheritance-diagram:: ConvBPDNScalarTV
:parts: 2
|
Solve the optimisation problem
.. math::
\mathrm{argmin}_\mathbf{x} \; \frac{1}{2}
\left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s}
\right\|_2^2 + \lambda \sum_m \| \mathbf{x}_m \|_1 +
\mu \sum_m \left\| \sqrt{\sum_i (G_i \mathbf{x}_m)^2} \right\|_1
\;\;,
where :math:`G_i` is an operator computing the derivative along index
:math:`i`, via the ADMM problem
.. math::
\mathrm{argmin}_\mathbf{x} \; (1/2) \left\| D \mathbf{x} -
\mathbf{s} \right\|_2^2 + \lambda
\| \mathbf{y}_L \|_1 + \mu \sum_m \left\| \sqrt{\sum_{i=0}^{L-1}
\mathbf{y}_i^2} \right\|_1 \quad \text{ such that } \quad
\left( \begin{array}{c} \Gamma_0 \\ \Gamma_1 \\ \vdots \\ I
\end{array} \right) \mathbf{x} =
\left( \begin{array}{c} \mathbf{y}_0 \\
\mathbf{y}_1 \\ \vdots \\ \mathbf{y}_L \end{array}
\right) \;\;,
where
.. math::
D = \left( \begin{array}{ccc} D_0 & D_1 & \ldots \end{array} \right)
\qquad
\mathbf{x} = \left( \begin{array}{c} \mathbf{x}_0 \\ \mathbf{x}_1 \\
\vdots \end{array} \right) \qquad
\Gamma_i = \left( \begin{array}{ccc}
G_i & 0 & \ldots \\ 0 & G_i & \ldots \\ \vdots & \vdots & \ddots
\end{array} \right) \;\;.
For multi-channel signals with a single-channel dictionary, scalar TV is
applied independently to each coefficient map for channel :math:`c` and
filter :math:`m`. Since multi-channel signals with a multi-channel
dictionary also have one coefficient map per filter, the behaviour is
the same as for single-channel signals.
After termination of the :meth:`solve` method, attribute :attr:`itstat`
is a list of tuples representing statistics of each iteration. The
fields of the named tuple ``IterationStats`` are:
``Iter`` : Iteration number
``ObjFun`` : Objective function value
``DFid`` : Value of data fidelity term :math:`(1/2) \| \sum_m
\mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2`
``RegL1`` : Value of regularisation term :math:`\sum_m \|
\mathbf{x}_m \|_1`
``RegTV`` : Value of regularisation term :math:`\sum_m \left\|
\sqrt{\sum_i (G_i \mathbf{x}_m)^2} \right\|_1`
``PrimalRsdl`` : Norm of primal residual
``DualRsdl`` : Norm of dual residual
``EpsPrimal`` : Primal residual stopping tolerance
:math:`\epsilon_{\mathrm{pri}}`
``EpsDual`` : Dual residual stopping tolerance
:math:`\epsilon_{\mathrm{dua}}`
``Rho`` : Penalty parameter
``XSlvRelRes`` : Relative residual of X step solver
``Time`` : Cumulative run time
"""
[docs]
class Options(cbpdn.ConvBPDN.Options):
r"""ConvBPDNScalarTV algorithm options
Options include all of those defined in
:class:`.admm.cbpdn.ConvBPDN.Options`, together with additional
options:
``TVWeight`` : An array of weights :math:`w_m` for the term
penalising the gradient of the coefficient maps. If this
option is defined, the regularization term is :math:`\sum_m w_m
\left\| \sqrt{\sum_i (G_i \mathbf{x}_m)^2} \right\|_1`
where :math:`w_m` is the weight for filter index :math:`m`. The
array should be an :math:`M`-vector where :math:`M` is the number
of filters in the dictionary.
"""
defaults = copy.deepcopy(cbpdn.ConvBPDN.Options.defaults)
defaults.update({'TVWeight' : 1.0})
def __init__(self, opt=None):
"""
Parameters
----------
opt : dict or None, optional (default None)
ConvBPDNScalarTV algorithm options
"""
if opt is None:
opt = {}
cbpdn.ConvBPDN.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1', 'RegTV')
itstat_fields_extra = ('XSlvRelRes',)
hdrtxt_objfn = ('Fnc', 'DFid', u('Regℓ1'), u('RegTV'))
hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid',
u('Regℓ1'): 'RegL1', u('RegTV'): 'RegTV'}
def __init__(self, D, S, lmbda, mu=0.0, opt=None, dimK=None, dimN=2):
"""
|
**Call graph**
.. image:: ../_static/jonga/cbpdnstv_init.svg
:width: 20%
:target: ../_static/jonga/cbpdnstv_init.svg
|
Parameters
----------
D : array_like
Dictionary matrix
S : array_like
Signal vector or matrix
lmbda : float
Regularisation parameter (l1)
mu : float
Regularisation parameter (gradient)
opt : :class:`ConvBPDNScalarTV.Options` object
Algorithm options
dimK : 0, 1, or None, optional (default None)
Number of dimensions in input signal corresponding to multiple
independent signals
dimN : int, optional (default 2)
Number of spatial dimensions
"""
if opt is None:
opt = ConvBPDNScalarTV.Options()
# Infer problem dimensions and set relevant attributes of self
self.cri = cr.CSC_ConvRepIndexing(D, S, dimK=dimK, dimN=dimN)
# Call parent class __init__
Nx = np.product(np.array(self.cri.shpX))
yshape = self.cri.shpX + (len(self.cri.axisN)+1,)
super(ConvBPDNScalarTV, self).__init__(Nx, yshape, yshape,
S.dtype, opt)
# Set l1 term scaling and weight array
self.lmbda = self.dtype.type(lmbda)
self.Wl1 = np.asarray(opt['L1Weight'], dtype=self.dtype)
self.Wl1 = self.Wl1.reshape(cr.l1Wshape(self.Wl1, self.cri))
self.mu = self.dtype.type(mu)
if hasattr(opt['TVWeight'], 'ndim') and opt['TVWeight'].ndim > 0:
self.Wtv = np.asarray(opt['TVWeight'].reshape(
(1,)*(dimN + 2) + opt['TVWeight'].shape), dtype=self.dtype)
else:
# Wtv is a scalar: no need to change shape
self.Wtv = np.asarray(opt['TVWeight'], dtype=self.dtype)
# Set penalty parameter
self.set_attr('rho', opt['rho'], dval=(50.0*self.lmbda + 1.0),
dtype=self.dtype)
# Set rho_xi attribute
self.set_attr('rho_xi', opt['AutoRho', 'RsdlTarget'], dval=1.0,
dtype=self.dtype)
# Reshape D and S to standard layout
self.D = np.asarray(D.reshape(self.cri.shpD), dtype=self.dtype)
self.S = np.asarray(S.reshape(self.cri.shpS), dtype=self.dtype)
# Compute signal in DFT domain
self.Sf = rfftn(self.S, None, self.cri.axisN)
self.Gf, GHGf = gradient_filters(self.cri.dimN+3, self.cri.axisN,
self.cri.Nv, dtype=self.dtype)
self.GHGf = self.Wtv**2 * GHGf
# Initialise byte-aligned arrays for pyfftw
self.YU = empty_aligned(self.Y.shape, dtype=self.dtype)
self.Xf = rfftn_empty_aligned(self.cri.shpX, self.cri.axisN,
self.dtype)
self.setdict()
[docs]
def setdict(self, D=None):
"""Set dictionary array."""
if D is not None:
self.D = np.asarray(D, dtype=self.dtype)
self.Df = rfftn(self.D, self.cri.Nv, self.cri.axisN)
# Compute D^H S
self.DSf = np.conj(self.Df) * self.Sf
if self.cri.Cd > 1:
self.DSf = np.sum(self.DSf, axis=self.cri.axisC, keepdims=True)
if self.opt['HighMemSolve'] and self.cri.Cd == 1:
self.c = sl.solvedbi_sm_c(
self.Df, np.conj(self.Df), self.rho*self.GHGf + self.rho,
self.cri.axisM)
else:
self.c = None
[docs]
def rhochange(self):
"""Updated cached c array when rho changes."""
if self.opt['HighMemSolve'] and self.cri.Cd == 1:
self.c = sl.solvedbi_sm_c(
self.Df, np.conj(self.Df), self.rho*self.GHGf + self.rho,
self.cri.axisM)
[docs]
def xstep(self):
r"""Minimise Augmented Lagrangian with respect to
:math:`\mathbf{x}`."""
self.YU[:] = self.Y - self.U
YUf = rfftn(self.YU, None, self.cri.axisN)
# The sum is over the extra axis indexing spatial gradient
# operators G_i, *not* over axisM
b = self.DSf + self.rho*(YUf[..., -1] + self.Wtv * np.sum(
np.conj(self.Gf) * YUf[..., 0:-1], axis=-1))
if self.cri.Cd == 1:
self.Xf[:] = sl.solvedbi_sm(
self.Df, self.rho*self.GHGf + self.rho, b, self.c,
self.cri.axisM)
else:
self.Xf[:] = sl.solvemdbi_ism(
self.Df, self.rho*self.GHGf + self.rho, b, self.cri.axisM,
self.cri.axisC)
self.X = irfftn(self.Xf, self.cri.Nv, self.cri.axisN)
if self.opt['LinSolveCheck']:
Dop = lambda x: sl.inner(self.Df, x, axis=self.cri.axisM)
if self.cri.Cd == 1:
DHop = lambda x: np.conj(self.Df) * x
else:
DHop = lambda x: sl.inner(np.conj(self.Df), x,
axis=self.cri.axisC)
ax = DHop(Dop(self.Xf)) + (self.rho*self.GHGf + self.rho)*self.Xf
self.xrrs = sl.rrs(ax, b)
else:
self.xrrs = None
[docs]
def ystep(self):
r"""Minimise Augmented Lagrangian with respect to
:math:`\mathbf{y}`."""
AXU = self.AX + self.U
self.Y[..., 0:-1] = sp.prox_l2(AXU[..., 0:-1], self.mu/self.rho)
self.Y[..., -1] = sp.prox_l1(AXU[..., -1],
(self.lmbda/self.rho) * self.Wl1)
[docs]
def obfn_fvarf(self):
"""Variable to be evaluated in computing data fidelity term,
depending on ``fEvalX`` option value.
"""
return self.Xf if self.opt['fEvalX'] else \
rfftn(self.Y[..., -1], None, self.cri.axisN)
[docs]
def var_y0(self):
r"""Get :math:`\mathbf{y}_0` variable, consisting of all blocks of
:math:`\mathbf{y}` corresponding to a gradient operator."""
return self.Y[..., 0:-1]
[docs]
def var_y1(self):
r"""Get :math:`\mathbf{y}_1` variable, the block of
:math:`\mathbf{y}` corresponding to the identity operator."""
return self.Y[..., -1:]
[docs]
def var_yx(self):
r"""Get component block of :math:`\mathbf{y}` that is constrained
to be equal to :math:`\mathbf{x}`."""
return self.Y[..., -1]
[docs]
def var_yx_idx(self):
r"""Get index expression for component block of :math:`\mathbf{y}`
that is constrained to be equal to :math:`\mathbf{x}`.
"""
return np.s_[..., -1]
[docs]
def getmin(self):
"""Get minimiser after optimisation."""
return self.X if self.opt['ReturnX'] else self.var_y1()[..., 0]
[docs]
def getcoef(self):
"""Get final coefficient array."""
return self.getmin()
[docs]
def obfn_g0var(self):
"""Variable to be evaluated in computing the TV regularisation
term, depending on the ``gEvalY`` option value.
"""
# Use of self.AXnr[..., 0:-1] instead of self.cnst_A0(None, self.Xf)
# reduces number of calls to self.cnst_A0
return self.var_y0() if self.opt['gEvalY'] else \
self.AXnr[..., 0:-1]
[docs]
def obfn_g1var(self):
r"""Variable to be evaluated in computing the :math:`\ell_1`
regularisation term, depending on the ``gEvalY`` option value.
"""
# Use of self.AXnr[...,-1:] instead of self.cnst_A1(self.X)
# reduces number of calls to self.cnst_A1
return self.var_y1() if self.opt['gEvalY'] else \
self.AXnr[..., -1:]
[docs]
def obfn_gvar(self):
"""Method providing compatibility with the interface of
:class:`.admm.cbpdn.ConvBPDN` and derived classes in order to make
this class compatible with classes such as :class:`.AddMaskSim`.
"""
return self.obfn_g1var()
[docs]
def eval_objfn(self):
"""Compute components of objective function as well as total
contribution to objective function.
"""
dfd = self.obfn_dfd()
reg = self.obfn_reg()
obj = dfd + reg[0]
return (obj, dfd) + reg[1:]
[docs]
def obfn_dfd(self):
r"""Compute data fidelity term :math:`(1/2) \| \sum_m \mathbf{d}_m *
\mathbf{x}_m - \mathbf{s} \|_2^2`.
"""
Ef = sl.inner(self.Df, self.obfn_fvarf(), axis=self.cri.axisM) \
- self.Sf
return rfl2norm2(Ef, self.S.shape, axis=self.cri.axisN)/2.0
[docs]
def obfn_reg(self):
"""Compute regularisation term and contribution to objective
function.
"""
rl1 = np.linalg.norm((self.Wl1 * self.obfn_g1var()).ravel(), 1)
rtv = np.sum(np.sqrt(np.sum(self.obfn_g0var()**2, axis=-1)))
return (self.lmbda*rl1 + self.mu*rtv, rl1, rtv)
[docs]
def cnst_A0(self, X, Xf=None):
r"""Compute :math:`A_0 \mathbf{x}` component of ADMM problem
constraint. In this case :math:`A_0 \mathbf{x} = (\Gamma_0^T \;\;
\Gamma_1^T \;\; \ldots )^T \mathbf{x}`.
"""
if Xf is None:
Xf = rfftn(X, axes=self.cri.axisN)
return self.Wtv[..., np.newaxis] * irfftn(
self.Gf * Xf[..., np.newaxis], self.cri.Nv, axes=self.cri.axisN)
[docs]
def cnst_A0T(self, X):
r"""Compute :math:`A_0^T \mathbf{x}` where :math:`A_0 \mathbf{x}`
is a component of the ADMM problem constraint. In this case
:math:`A_0^T \mathbf{x} = (\Gamma_0^T \;\; \Gamma_1^T \;\; \ldots )
\mathbf{x}`.
"""
Xf = rfftn(X, axes=self.cri.axisN)
return self.Wtv[..., np.newaxis] * irfftn(
np.conj(self.Gf) * Xf[..., 0:-1], self.cri.Nv, axes=self.cri.axisN)
[docs]
def cnst_A1(self, X):
r"""Compute :math:`A_1 \mathbf{x}` component of ADMM problem
constraint. In this case :math:`A_1 \mathbf{x} = \mathbf{x}`.
"""
return X[..., np.newaxis]
[docs]
def cnst_A1T(self, X):
r"""Compute :math:`A_1^T \mathbf{x}` where :math:`A_1 \mathbf{x}`
is a component of the ADMM problem constraint. In this case
:math:`A_1^T \mathbf{x} = \mathbf{x}`.
"""
return X[..., -1]
[docs]
def cnst_A(self, X, Xf=None):
r"""Compute :math:`A \mathbf{x}` component of ADMM problem
constraint. In this case :math:`A \mathbf{x} = (\Gamma_0^T \;\;
\Gamma_1^T \;\; \ldots \;\; I)^T \mathbf{x}`.
"""
return np.concatenate((self.cnst_A0(X, Xf),
self.cnst_A1(X)), axis=-1)
[docs]
def cnst_AT(self, X):
r"""Compute :math:`A^T \mathbf{x}` where :math:`A \mathbf{x}` is
a component of ADMM problem constraint. In this case
:math:`A^T \mathbf{x} = (\Gamma_0^T \;\; \Gamma_1^T \;\; \ldots
\;\; I) \mathbf{x}`.
"""
return np.sum(self.cnst_A0T(X), axis=-1) + self.cnst_A1T(X)
[docs]
def cnst_B(self, Y):
r"""Compute :math:`B \mathbf{y}` component of ADMM problem constraint.
In this case :math:`B \mathbf{y} = -\mathbf{y}`.
"""
return -Y
[docs]
def cnst_c(self):
r"""Compute constant component :math:`\mathbf{c}` of ADMM problem
constraint. In this case :math:`\mathbf{c} = \mathbf{0}`.
"""
return 0.0
[docs]
def relax_AX(self):
"""Implement relaxation if option ``RelaxParam`` != 1.0."""
# We need to keep the non-relaxed version of AX since it is
# required for computation of primal residual r
self.AXnr = self.cnst_A(self.X, self.Xf)
if self.rlx == 1.0:
# If RelaxParam option is 1.0 there is no relaxation
self.AX = self.AXnr
else:
# Avoid calling cnst_c() more than once in case it is expensive
# (e.g. due to allocation of a large block of memory)
if not hasattr(self, '_cnst_c'):
self._cnst_c = self.cnst_c()
# Compute relaxed version of AX
alpha = self.rlx
self.AX = alpha*self.AXnr - (1-alpha)*(self.cnst_B(self.Y) -
self._cnst_c)
[docs]
def reconstruct(self, X=None):
"""Reconstruct representation."""
if X is None:
Xf = self.Xf
else:
Xf = rfftn(X, None, self.cri.axisN)
Sf = np.sum(self.Df * Xf, axis=self.cri.axisM)
return irfftn(Sf, self.cri.Nv, self.cri.axisN)
[docs]
class ConvBPDNVectorTV(ConvBPDNScalarTV):
r"""
ADMM algorithm for an extension of Convolutional BPDN including
a term penalising the vector total variation of the coefficient maps
:cite:`wohlberg-2017-convolutional`.
|
.. inheritance-diagram:: ConvBPDNVectorTV
:parts: 2
|
Solve the optimisation problem
.. math::
\mathrm{argmin}_\mathbf{x} \; \frac{1}{2}
\left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s}
\right\|_2^2 + \lambda \sum_m \| \mathbf{x}_m \|_1 +
\mu \left\| \sqrt{\sum_m \sum_i (G_i \mathbf{x}_m)^2} \right\|_1
\;\;,
where :math:`G_i` is an operator computing the derivative along index
:math:`i`, via the ADMM problem
.. math::
\mathrm{argmin}_\mathbf{x} \; (1/2) \left\| D \mathbf{x} -
\mathbf{s} \right\|_2^2 + \lambda
\| \mathbf{y}_L \|_1 + \mu \left\| \sqrt{\sum_{i=0}^{L-1}
I_B \mathbf{y}_i^2} \right\|_1 \quad \text{ such that } \quad
\left( \begin{array}{c} \Gamma_0 \\ \Gamma_1 \\ \vdots \\ I
\end{array} \right) \mathbf{x} =
\left( \begin{array}{c} \mathbf{y}_0 \\
\mathbf{y}_1 \\ \vdots \\ \mathbf{y}_L \end{array}
\right) \;\;,
where
.. math::
D = \left( \begin{array}{ccc} D_0 & D_1 & \ldots \end{array} \right)
\qquad
\mathbf{x} = \left( \begin{array}{c} \mathbf{x}_0 \\ \mathbf{x}_1 \\
\vdots \end{array} \right) \qquad
\Gamma_i = \left( \begin{array}{ccc}
G_i & 0 & \ldots \\ 0 & G_i & \ldots \\ \vdots & \vdots & \ddots
\end{array} \right) \qquad
I_B = \left( \begin{array}{ccc} I & I & \ldots \end{array} \right)
\;\;.
For multi-channel signals with a single-channel dictionary, vector TV is
applied jointly over the coefficient maps for channel :math:`c` and
filter :math:`m`. Since multi-channel signals with a multi-channel
dictionary also have one coefficient map per filter, the behaviour is
the same as for single-channel signals.
After termination of the :meth:`solve` method, attribute :attr:`itstat`
is a list of tuples representing statistics of each iteration. The
fields of the named tuple ``IterationStats`` are:
``Iter`` : Iteration number
``ObjFun`` : Objective function value
``DFid`` : Value of data fidelity term :math:`(1/2) \| \sum_m
\mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2`
``RegL1`` : Value of regularisation term :math:`\sum_m \|
\mathbf{x}_m \|_1`
``RegTV`` : Value of regularisation term :math:`\left\|
\sqrt{\sum_m \sum_i (G_i \mathbf{x}_m)^2} \right\|_1`
``PrimalRsdl`` : Norm of primal residual
``DualRsdl`` : Norm of dual residual
``EpsPrimal`` : Primal residual stopping tolerance
:math:`\epsilon_{\mathrm{pri}}`
``EpsDual`` : Dual residual stopping tolerance
:math:`\epsilon_{\mathrm{dua}}`
``Rho`` : Penalty parameter
``XSlvRelRes`` : Relative residual of X step solver
``Time`` : Cumulative run time
"""
def __init__(self, D, S, lmbda, mu=0.0, opt=None, dimK=None, dimN=2):
"""
|
**Call graph**
.. image:: ../_static/jonga/cbpdnvtv_init.svg
:width: 20%
:target: ../_static/jonga/cbpdnvtv_init.svg
|
Parameters
----------
D : array_like
Dictionary matrix
S : array_like
Signal vector or matrix
lmbda : float
Regularisation parameter (l1)
mu : float
Regularisation parameter (gradient)
opt : :class:`ConvBPDNScalarTV.Options` object
Algorithm options
dimK : 0, 1, or None, optional (default None)
Number of dimensions in input signal corresponding to multiple
independent signals
dimN : int, optional (default 2)
Number of spatial dimensions
"""
super(ConvBPDNVectorTV, self).__init__(D, S, lmbda, mu, opt,
dimK, dimN)
[docs]
def ystep(self):
r"""Minimise Augmented Lagrangian with respect to
:math:`\mathbf{y}`."""
AXU = self.AX + self.U
self.Y[..., 0:-1] = sp.prox_l2(AXU[..., 0:-1], self.mu/self.rho,
axis=(self.cri.axisM, -1))
self.Y[..., -1] = sp.prox_l1(AXU[..., -1],
(self.lmbda/self.rho) * self.Wl1)
[docs]
def obfn_reg(self):
"""Compute regularisation term and contribution to objective
function.
"""
rl1 = np.linalg.norm((self.Wl1 * self.obfn_g1var()).ravel(), 1)
rtv = np.sum(np.sqrt(np.sum(self.obfn_g0var()**2,
axis=(self.cri.axisM, -1))))
return (self.lmbda*rl1 + self.mu*rtv, rl1, rtv)
[docs]
class ConvBPDNRecTV(admm.ADMM):
r"""
ADMM algorithm for an extension of Convolutional BPDN including
terms penalising the total variation of the reconstruction from the
sparse representation :cite:`wohlberg-2017-convolutional`.
|
.. inheritance-diagram:: ConvBPDNRecTV
:parts: 2
|
Solve the optimisation problem
.. math::
\mathrm{argmin}_\mathbf{x} \; \frac{1}{2}
\left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s}
\right\|_2^2 + \lambda \sum_m \| \mathbf{x}_m \|_1 +
\mu \left\| \sqrt{\sum_i \left( G_i \left( \sum_m \mathbf{d}_m *
\mathbf{x}_m \right) \right)^2} \right\|_1 \;\;,
where :math:`G_i` is an operator computing the derivative along index
:math:`i`, via the ADMM problem
.. math::
\mathrm{argmin}_\mathbf{x} \; (1/2) \left\| D
\mathbf{x} - \mathbf{s} \right\|_2^2 +
\lambda \| \mathbf{y}_0 \|_1 + \mu \left\|
\sqrt{\sum_{i=1}^L \mathbf{y}_i^2} \right\|_1 \quad \text{ such that }
\quad \left( \begin{array}{c} I \\ \Gamma_0 \\ \Gamma_1 \\ \vdots \\
\Gamma_{L-1} \end{array} \right) \mathbf{x} =
\left( \begin{array}{c} \mathbf{y}_0 \\
\mathbf{y}_1 \\ \mathbf{y}_2 \\ \vdots \\ \mathbf{y}_L \end{array}
\right) \;\;,
where
.. math::
D = \left( \begin{array}{ccc} D_0 & D_1 & \ldots \end{array} \right)
\qquad
\mathbf{x} = \left( \begin{array}{c} \mathbf{x}_0 \\ \mathbf{x}_1 \\
\vdots \end{array} \right) \qquad
\Gamma_i = \left( \begin{array}{ccc} G_{i,0} & G_{i,1} & \ldots
\end{array} \right) \;\;,
and linear operator :math:`G_{i,m}` is defined such that
.. math::
G_{i,m} \mathbf{x} = \mathbf{g}_i * \mathbf{d}_m * \mathbf{x}
\;\;,
where :math:`\mathbf{g}_i` is the filter corresponding to :math:`G_i`,
i.e. :math:`G_i \mathbf{x} = \mathbf{g}_i * \mathbf{x}`.
For multi-channel signals, vector TV is applied jointly over the
reconstructions of all channels.
After termination of the :meth:`solve` method, attribute :attr:`itstat`
is a list of tuples representing statistics of each iteration. The
fields of the named tuple ``IterationStats`` are:
``Iter`` : Iteration number
``ObjFun`` : Objective function value
``DFid`` : Value of data fidelity term :math:`(1/2) \| \sum_m
\mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2`
``RegL1`` : Value of regularisation term :math:`\sum_m \|
\mathbf{x}_m \|_1`
``RegTV`` : Value of regularisation term :math:`\left\|
\sqrt{\sum_i \left( G_i \left( \sum_m \mathbf{d}_m *
\mathbf{x}_m \right) \right)^2} \right\|_1`
``PrimalRsdl`` : Norm of primal residual
``DualRsdl`` : Norm of dual residual
``EpsPrimal`` : Primal residual stopping tolerance
:math:`\epsilon_{\mathrm{pri}}`
``EpsDual`` : Dual residual stopping tolerance
:math:`\epsilon_{\mathrm{dua}}`
``Rho`` : Penalty parameter
``XSlvRelRes`` : Relative residual of X step solver
``Time`` : Cumulative run time
"""
[docs]
class Options(cbpdn.ConvBPDN.Options):
r"""ConvBPDNRecTV algorithm options
Options include all of those defined in
:class:`.admm.cbpdn.ConvBPDN.Options`, together with additional
options:
``TVWeight`` : An array of weights :math:`w_m` for the term
penalising the gradient of the coefficient maps. If this
option is defined, the regularization term is :math:`\left\|
\sqrt{\sum_i \left( G_i \left( \sum_m w_m (\mathbf{d}_m *
\mathbf{x}_m) \right) \right)^2} \right\|_1` where :math:`w_m`
is the weight for filter index :math:`m`. The array should be an
:math:`M`-vector where :math:`M` is the number of filters in the
dictionary.
"""
defaults = copy.deepcopy(cbpdn.ConvBPDN.Options.defaults)
defaults.update({'TVWeight' : 1.0})
def __init__(self, opt=None):
"""
Parameters
----------
opt : dict or None, optional (default None)
ConvBPDNRecTV algorithm options
"""
if opt is None:
opt = {}
cbpdn.ConvBPDN.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1', 'RegTV')
itstat_fields_extra = ('XSlvRelRes',)
hdrtxt_objfn = ('Fnc', 'DFid', u('Regℓ1'), u('RegTV'))
hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid',
u('Regℓ1'): 'RegL1', u('RegTV'): 'RegTV'}
def __init__(self, D, S, lmbda, mu=0.0, opt=None, dimK=None, dimN=2):
"""
|
**Call graph**
.. image:: ../_static/jonga/cbpdnrtv_init.svg
:width: 20%
:target: ../_static/jonga/cbpdnrtv_init.svg
|
Parameters
----------
D : array_like
Dictionary matrix
S : array_like
Signal vector or matrix
lmbda : float
Regularisation parameter (l1)
mu : float
Regularisation parameter (gradient)
opt : :class:`ConvBPDNRecTV.Options` object
Algorithm options
dimK : 0, 1, or None, optional (default None)
Number of dimensions in input signal corresponding to multiple
independent signals
dimN : int, optional (default 2)
Number of spatial dimensions
"""
if opt is None:
opt = ConvBPDNRecTV.Options()
# Infer problem dimensions and set relevant attributes of self
self.cri = cr.CSC_ConvRepIndexing(D, S, dimK=dimK, dimN=dimN)
# Call parent class __init__
Nx = np.product(np.array(self.cri.shpX))
yshape = list(self.cri.shpX)
yshape[self.cri.axisM] += len(self.cri.axisN) * self.cri.Cd
super(ConvBPDNRecTV, self).__init__(Nx, yshape, yshape,
S.dtype, opt)
# Set l1 term scaling and weight array
self.lmbda = self.dtype.type(lmbda)
self.Wl1 = np.asarray(opt['L1Weight'], dtype=self.dtype)
self.Wl1 = self.Wl1.reshape(cr.l1Wshape(self.Wl1, self.cri))
self.mu = self.dtype.type(mu)
if hasattr(opt['TVWeight'], 'ndim') and opt['TVWeight'].ndim > 0:
self.Wtv = np.asarray(opt['TVWeight'].reshape(
(1,)*(dimN + 2) + opt['TVWeight'].shape), dtype=self.dtype)
else:
# Wtv is a scalar: no need to change shape
self.Wtv = self.dtype.type(opt['TVWeight'])
# Set penalty parameter
self.set_attr('rho', opt['rho'], dval=(50.0*self.lmbda + 1.0),
dtype=self.dtype)
# Set rho_xi attribute
self.set_attr('rho_xi', opt['AutoRho', 'RsdlTarget'], dval=1.0,
dtype=self.dtype)
# Reshape D and S to standard layout
self.D = np.asarray(D.reshape(self.cri.shpD), dtype=self.dtype)
self.S = np.asarray(S.reshape(self.cri.shpS), dtype=self.dtype)
# Compute signal in DFT domain
self.Sf = rfftn(self.S, None, self.cri.axisN)
self.Gf, GHGf = gradient_filters(self.cri.dimN+3, self.cri.axisN,
self.cri.Nv, dtype=self.dtype)
# Initialise byte-aligned arrays for pyfftw
self.YU = empty_aligned(self.Y.shape, dtype=self.dtype)
self.Xf = rfftn_empty_aligned(self.cri.shpX, self.cri.axisN,
self.dtype)
self.setdict()
[docs]
def setdict(self, D=None):
"""Set dictionary array."""
if D is not None:
self.D = np.asarray(D, dtype=self.dtype)
self.Df = rfftn(self.D, self.cri.Nv, self.cri.axisN)
self.GDf = self.Gf * (self.Wtv * self.Df)[..., np.newaxis]
# Compute D^H S
self.DSf = np.conj(self.Df) * self.Sf
if self.cri.Cd > 1:
self.DSf = np.sum(self.DSf, axis=self.cri.axisC, keepdims=True)
[docs]
def block_sep0(self, Y):
"""Separate variable into component corresponding to Y0 in Y."""
return Y[..., 0:self.cri.M]
[docs]
def block_sep1(self, Y):
"""Separate variable into component corresponding to Y1 in Y."""
Y1 = Y[..., self.cri.M:]
# If cri.Cd > 1 (multi-channel dictionary), we need to undo the
# reshape performed in block_cat
if self.cri.Cd > 1:
shp = list(Y1.shape)
shp[self.cri.axisM] = self.cri.dimN
shp[self.cri.axisC] = self.cri.Cd
Y1 = Y1.reshape(shp)
# Axes are swapped here for similar reasons to those
# motivating swapping in cbpdn.ConvTwoBlockCnstrnt.block_sep0
Y1 = np.swapaxes(Y1[..., np.newaxis], self.cri.axisM, -1)
return Y1
[docs]
def block_cat(self, Y0, Y1):
"""Concatenate components corresponding to Y0 and Y1 blocks
into Y.
"""
# Axes are swapped here for similar reasons to those
# motivating swapping in cbpdn.ConvTwoBlockCnstrnt.block_cat
Y1sa = np.swapaxes(Y1, self.cri.axisM, -1)[..., 0]
# If cri.Cd > 1 (multi-channel dictionary) Y0 has a singleton
# channel axis but Y1 has a non-singleton channel axis. To make
# it possible to concatenate Y0 and Y1, we reshape Y1 by a
# partial ravel of axisM and axisC onto axisM.
if self.cri.Cd > 1:
shp = list(Y1sa.shape)
shp[self.cri.axisM] *= shp[self.cri.axisC]
shp[self.cri.axisC] = 1
Y1sa = Y1sa.reshape(shp)
return np.concatenate((Y0, Y1sa), axis=self.cri.axisM)
[docs]
def xstep(self):
r"""Minimise Augmented Lagrangian with respect to
:math:`\mathbf{x}`."""
self.YU[:] = self.Y - self.U
YUf = rfftn(self.YU, None, self.cri.axisN)
YUf0 = self.block_sep0(YUf)
YUf1 = self.block_sep1(YUf)
b = self.rho * np.sum(np.conj(self.GDf) * YUf1, axis=-1)
if self.cri.Cd > 1:
b = np.sum(b, axis=self.cri.axisC, keepdims=True)
b += self.DSf + self.rho*YUf0
# Concatenate multiple GDf components on axisC. For
# single-channel signals, and multi-channel signals with a
# single-channel dictionary, we end up with sl.solvemdbi_ism
# solving a linear system of rank dimN+1 (corresponding to the
# dictionary and a gradient operator per spatial dimension) plus
# an identity. For multi-channel signals with a multi-channel
# dictionary, we end up with sl.solvemdbi_ism solving a linear
# system of rank C.d (dimN+1) (corresponding to the dictionary
# and a gradient operator per spatial dimension for each
# channel) plus an identity.
# The structure of the linear system to be solved depends on the
# number of channels in the signal and dictionary. Both branches are
# the same in the single-channel signal case (the choice of handling
# it via the 'else' branch is somewhat arbitrary).
if self.cri.C > 1 and self.cri.Cd == 1:
# Concatenate multiple GDf components on the final axis
# of GDf (that indexes the number of gradient operators). For
# multi-channel signals with a single-channel dictionary,
# sl.solvemdbi_ism has to solve a linear system of rank dimN+1
# (corresponding to the dictionary and a gradient operator per
# spatial dimension)
DfGDf = np.concatenate(
[self.Df[..., np.newaxis],] +
[np.sqrt(self.rho)*self.GDf[..., k, np.newaxis] for k
in range(self.GDf.shape[-1])], axis=-1)
self.Xf[:] = sl.solvemdbi_ism(DfGDf, self.rho, b[..., np.newaxis],
self.cri.axisM, -1)[..., 0]
else:
# Concatenate multiple GDf components on axisC. For multi-channel
# signals with a multi-channel dictionary, sl.solvemdbi_ism has
# to solve a linear system of rank C.d (dimN+1) (corresponding to
# the dictionary and a gradient operator per spatial dimension
# for each channel) plus an identity.
DfGDf = np.concatenate(
[self.Df,] + [np.sqrt(self.rho)*self.GDf[..., k] for k
in range(self.GDf.shape[-1])],
axis=self.cri.axisC)
self.Xf[:] = sl.solvemdbi_ism(DfGDf, self.rho, b, self.cri.axisM,
self.cri.axisC)
self.X = irfftn(self.Xf, self.cri.Nv, self.cri.axisN)
if self.opt['LinSolveCheck']:
if self.cri.C > 1 and self.cri.Cd == 1:
Dop = lambda x: sl.inner(DfGDf, x[..., np.newaxis],
axis=self.cri.axisM)
DHop = lambda x: sl.inner(np.conj(DfGDf), x, axis=-1)
ax = DHop(Dop(self.Xf))[..., 0] + self.rho*self.Xf
else:
Dop = lambda x: sl.inner(DfGDf, x, axis=self.cri.axisM)
DHop = lambda x: sl.inner(np.conj(DfGDf), x,
axis=self.cri.axisC)
ax = DHop(Dop(self.Xf)) + self.rho*self.Xf
self.xrrs = sl.rrs(ax, b)
else:
self.xrrs = None
[docs]
def ystep(self):
r"""Minimise Augmented Lagrangian with respect to
:math:`\mathbf{y}`."""
AXU = self.AX + self.U
self.block_sep0(self.Y)[:] = sp.prox_l1(
self.block_sep0(AXU), (self.lmbda/self.rho) * self.Wl1)
self.block_sep1(self.Y)[:] = sp.prox_l2(
self.block_sep1(AXU), self.mu/self.rho, axis=(self.cri.axisC, -1))
[docs]
def obfn_fvarf(self):
"""Variable to be evaluated in computing data fidelity term,
depending on ``fEvalX`` option value.
"""
return self.Xf if self.opt['fEvalX'] else \
rfftn(self.block_sep0(self.Y), None, self.cri.axisN)
[docs]
def var_y0(self):
r"""Get :math:`\mathbf{y}_0` variable, the block of
:math:`\mathbf{y}` corresponding to the identity operator."""
return self.block_sep0(self.Y)
[docs]
def var_y1(self):
r"""Get :math:`\mathbf{y}_1` variable, consisting of all blocks of
:math:`\mathbf{y}` corresponding to a gradient operator."""
return self.block_sep1(self.Y)
[docs]
def var_yx(self):
r"""Get component block of :math:`\mathbf{y}` that is constrained to
be equal to :math:`\mathbf{x}`"""
return self.var_y0()
[docs]
def var_yx_idx(self):
r"""Get index expression for component block of :math:`\mathbf{y}`
that is constrained to be equal to :math:`\mathbf{x}`.
"""
return np.s_[..., 0:self.cri.M]
[docs]
def getmin(self):
"""Get minimiser after optimisation."""
return self.X if self.opt['ReturnX'] else self.var_y0()
[docs]
def getcoef(self):
"""Get final coefficient array."""
return self.getmin()
[docs]
def obfn_g0var(self):
"""Variable to be evaluated in computing the TV regularisation
term, depending on the ``gEvalY`` option value.
"""
# Use of self.block_sep0(self.AXnr) instead of self.cnst_A0(self.X)
# reduces number of calls to self.cnst_A0
return self.var_y0() if self.opt['gEvalY'] else \
self.block_sep0(self.AXnr)
[docs]
def obfn_g1var(self):
r"""Variable to be evaluated in computing the :math:`\ell_1`
regularisation term, depending on the ``gEvalY`` option value.
"""
# Use of self.block_sep1(self.AXnr) instead of self.cnst_A1(self.X)
# reduces number of calls to self.cnst_A0
return self.var_y1() if self.opt['gEvalY'] else \
self.block_sep1(self.AXnr)
[docs]
def obfn_gvar(self):
"""Method providing compatibility with the interface of
:class:`.admm.cbpdn.ConvBPDN` and derived classes in order to make
this class compatible with classes such as :class:`.AddMaskSim`.
"""
return self.obfn_g1var()
[docs]
def eval_objfn(self):
"""Compute components of objective function as well as total
contribution to objective function.
"""
dfd = self.obfn_dfd()
reg = self.obfn_reg()
obj = dfd + reg[0]
return (obj, dfd) + reg[1:]
[docs]
def obfn_dfd(self):
r"""Compute data fidelity term :math:`(1/2) \| \sum_m \mathbf{d}_m *
\mathbf{x}_m - \mathbf{s} \|_2^2`.
"""
Ef = sl.inner(self.Df, self.obfn_fvarf(), axis=self.cri.axisM) \
- self.Sf
return rfl2norm2(Ef, self.S.shape, axis=self.cri.axisN)/2.0
[docs]
def obfn_reg(self):
"""Compute regularisation term and contribution to objective
function.
"""
rl1 = np.linalg.norm((self.Wl1 * self.obfn_g0var()).ravel(), 1)
rtv = np.sum(np.sqrt(np.sum(self.obfn_g1var()**2,
axis=(self.cri.axisC, -1))))
return (self.lmbda*rl1 + self.mu*rtv, rl1, rtv)
[docs]
def cnst_A0(self, X):
r"""Compute :math:`A_0 \mathbf{x}` component of ADMM problem
constraint. In this case :math:`A_0 \mathbf{x} = \mathbf{x}`.
"""
return X
[docs]
def cnst_A0T(self, Y0):
r"""Compute :math:`A_0^T \mathbf{y}_0` component of
:math:`A^T \mathbf{y}`. In this case :math:`A_0^T \mathbf{y}_0 =
\mathbf{y}_0`, i.e. :math:`A_0 = I`.
"""
return Y0
[docs]
def cnst_A1(self, X, Xf=None):
r"""Compute :math:`A_1 \mathbf{x}` component of ADMM problem
constraint. In this case :math:`A_1 \mathbf{x} = (\Gamma_0^T \;\;
\Gamma_1^T \;\; \ldots )^T \mathbf{x}`.
"""
if Xf is None:
Xf = rfftn(X, axes=self.cri.axisN)
return irfftn(sl.inner(
self.GDf, Xf[..., np.newaxis], axis=self.cri.axisM), self.cri.Nv,
self.cri.axisN)
[docs]
def cnst_A1T(self, Y1):
r"""Compute :math:`A_1^T \mathbf{y}_1` component of
:math:`A^T \mathbf{y}`. In this case :math:`A_1^T \mathbf{y}_1 =
(\Gamma_0^T \;\; \Gamma_1^T \;\; \ldots) \mathbf{y}_1`.
"""
Y1f = rfftn(Y1, None, axes=self.cri.axisN)
return irfftn(np.conj(self.GDf) * Y1f, self.cri.Nv,
self.cri.axisN)
[docs]
def cnst_A(self, X, Xf=None):
r"""Compute :math:`A \mathbf{x}` component of ADMM problem
constraint. In this case :math:`A \mathbf{x} = (I \;\; \Gamma_0^T
\;\; \Gamma_1^T \;\; \ldots)^T \mathbf{x}`.
"""
return self.block_cat(self.cnst_A0(X), self.cnst_A1(X, Xf))
[docs]
def cnst_AT(self, Y):
r"""Compute :math:`A^T \mathbf{y}`. In this case
:math:`A^T \mathbf{y} = (I \;\; \Gamma_0^T \;\; \Gamma_1^T \;\;
\ldots) \mathbf{y}`.
"""
return self.cnst_A0T(self.block_sep0(Y)) + \
np.sum(self.cnst_A1T(self.block_sep1(Y)), axis=-1)
[docs]
def cnst_B(self, Y):
r"""Compute :math:`B \mathbf{y}` component of ADMM problem
constraint. In this case :math:`B \mathbf{y} = -\mathbf{y}`.
"""
return -Y
[docs]
def cnst_c(self):
r"""Compute constant component :math:`\mathbf{c}` of ADMM problem
constraint. In this case :math:`\mathbf{c} = \mathbf{0}`.
"""
return 0.0
[docs]
def relax_AX(self):
"""Implement relaxation if option ``RelaxParam`` != 1.0."""
# We need to keep the non-relaxed version of AX since it is
# required for computation of primal residual r
self.AXnr = self.cnst_A(self.X, self.Xf)
if self.rlx == 1.0:
# If RelaxParam option is 1.0 there is no relaxation
self.AX = self.AXnr
else:
# Avoid calling cnst_c() more than once in case it is expensive
# (e.g. due to allocation of a large block of memory)
if not hasattr(self, '_cnst_c'):
self._cnst_c = self.cnst_c()
# Compute relaxed version of AX
alpha = self.rlx
self.AX = alpha*self.AXnr - (1-alpha)*(self.cnst_B(self.Y) -
self._cnst_c)
[docs]
def reconstruct(self, X=None):
"""Reconstruct representation."""
if X is None:
Xf = self.Xf
else:
Xf = rfftn(X, None, self.cri.axisN)
Sf = np.sum(self.Df * Xf, axis=self.cri.axisM)
return irfftn(Sf, self.cri.Nv, self.cri.axisN)