Source code for sporco.admm.cbpdn

# -*- coding: utf-8 -*-
# Copyright (C) 2015-2019 by Brendt Wohlberg <brendt@ieee.org>
# All rights reserved. BSD 3-clause License.
# This file is part of the SPORCO package. Details of the copyright
# and user license can be found in the 'LICENSE.txt' file distributed
# with the package.

"""Classes for ADMM algorithm for the Convolutional BPDN problem"""

from __future__ import division
from __future__ import absolute_import
from __future__ import print_function
from builtins import range

import copy
from types import MethodType
import numpy as np

from sporco.admm import admm
import sporco.cnvrep as cr
import sporco.linalg as sl
import sporco.prox as sp
from sporco.util import u


__author__ = """Brendt Wohlberg <brendt@ieee.org>"""


[docs]class GenericConvBPDN(admm.ADMMEqual): r""" Base class for ADMM algorithm for solving variants of the Convolutional BPDN (CBPDN) :cite:`wohlberg-2014-efficient` :cite:`wohlberg-2016-efficient` :cite:`wohlberg-2016-convolutional` problem. | .. inheritance-diagram:: GenericConvBPDN :parts: 2 | The generic problem form is .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + g( \{ \mathbf{x}_m \} ) for input image :math:`\mathbf{s}`, dictionary filters :math:`\mathbf{d}_m`, and coefficient maps :math:`\mathbf{x}_m`, and where :math:`g(\cdot)` is a penalty term or the indicator function of a constraint. It is solved via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x}, \mathbf{y}} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + g( \{ \mathbf{y}_m \} ) \quad \text{such that} \quad \mathbf{x}_m = \mathbf{y}_m \;\;. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value ``DFid`` : Value of data fidelity term :math:`(1/2) \| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2` ``Reg`` : Value of regularisation term ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``XSlvRelRes`` : Relative residual of X step solver ``Time`` : Cumulative run time """
[docs] class Options(admm.ADMMEqual.Options): """ConvBPDN algorithm options Options include all of those defined in :class:`.admm.ADMMEqual.Options`, together with additional options: ``AuxVarObj`` : Flag indicating whether the objective function should be evaluated using variable X (``False``) or Y (``True``) as its argument. Setting this flag to ``True`` often gives a better estimate of the objective function, but at additional computational cost. ``LinSolveCheck`` : Flag indicating whether to compute relative residual of X step solver. ``HighMemSolve`` : Flag indicating whether to use a slightly faster algorithm at the expense of higher memory usage. ``NonNegCoef`` : Flag indicating whether to force solution to be non-negative. ``NoBndryCross`` : Flag indicating whether all solution coefficients corresponding to filters crossing the image boundary should be forced to zero. """ defaults = copy.deepcopy(admm.ADMMEqual.Options.defaults) # Warning: although __setitem__ below takes care of setting # 'fEvalX' and 'gEvalY' from the value of 'AuxVarObj', this # cannot be relied upon for initialisation since the order of # initialisation of the dictionary keys is not deterministic; # if 'AuxVarObj' is initialised first, the other two keys are # correctly set, but this setting is overwritten when 'fEvalX' # and 'gEvalY' are themselves initialised defaults.update({'AuxVarObj': False, 'fEvalX': True, 'gEvalY': False, 'ReturnX': False, 'HighMemSolve': False, 'LinSolveCheck': False, 'RelaxParam': 1.8, 'NonNegCoef': False, 'NoBndryCross': False}) defaults['AutoRho'].update({'Enabled': True, 'Period': 1, 'AutoScaling': True, 'Scaling': 1000.0, 'RsdlRatio': 1.2}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) GenericConvBPDN algorithm options """ if opt is None: opt = {} admm.ADMMEqual.Options.__init__(self, opt) def __setitem__(self, key, value): """Set options 'fEvalX' and 'gEvalY' appropriately when option 'AuxVarObj' is set. """ admm.ADMMEqual.Options.__setitem__(self, key, value) if key == 'AuxVarObj': if value is True: self['fEvalX'] = False self['gEvalY'] = True else: self['fEvalX'] = True self['gEvalY'] = False
itstat_fields_objfn = ('ObjFun', 'DFid', 'Reg') itstat_fields_extra = ('XSlvRelRes',) hdrtxt_objfn = ('Fnc', 'DFid', 'Reg') hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid', 'Reg': 'Reg'} def __init__(self, D, S, opt=None, dimK=None, dimN=2): """ This class supports an arbitrary number of spatial dimensions, `dimN`, with a default of 2. The input dictionary `D` is either `dimN` + 1 dimensional, in which case each spatial component (image in the default case) is assumed to consist of a single channel, or `dimN` + 2 dimensional, in which case the final dimension is assumed to contain the channels (e.g. colour channels in the case of images). The input signal set `S` is either `dimN` dimensional (no channels, only one signal), `dimN` + 1 dimensional (either multiple channels or multiple signals), or `dimN` + 2 dimensional (multiple channels and multiple signals). Determination of problem dimensions is handled by :class:`.cnvrep.CSC_ConvRepIndexing`. Parameters ---------- D : array_like Dictionary array S : array_like Signal array opt : :class:`GenericConvBPDN.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial/temporal dimensions """ # Set default options if none specified if opt is None: opt = GenericConvBPDN.Options() # Infer problem dimensions and set relevant attributes of self if not hasattr(self, 'cri'): self.cri = cr.CSC_ConvRepIndexing(D, S, dimK=dimK, dimN=dimN) # Call parent class __init__ super(GenericConvBPDN, self).__init__(self.cri.shpX, S.dtype, opt) # Reshape D and S to standard layout self.D = np.asarray(D.reshape(self.cri.shpD), dtype=self.dtype) self.S = np.asarray(S.reshape(self.cri.shpS), dtype=self.dtype) # Compute signal in DFT domain self.Sf = sl.rfftn(self.S, None, self.cri.axisN) # Initialise byte-aligned arrays for pyfftw self.YU = sl.pyfftw_empty_aligned(self.Y.shape, dtype=self.dtype) self.Xf = sl.pyfftw_rfftn_empty_aligned(self.Y.shape, self.cri.axisN, self.dtype) self.setdict()
[docs] def setdict(self, D=None): """Set dictionary array.""" if D is not None: self.D = np.asarray(D, dtype=self.dtype) self.Df = sl.rfftn(self.D, self.cri.Nv, self.cri.axisN) # Compute D^H S self.DSf = np.conj(self.Df) * self.Sf if self.cri.Cd > 1: self.DSf = np.sum(self.DSf, axis=self.cri.axisC, keepdims=True) if self.opt['HighMemSolve'] and self.cri.Cd == 1: self.c = sl.solvedbi_sm_c(self.Df, np.conj(self.Df), self.rho, self.cri.axisM) else: self.c = None
[docs] def getcoef(self): """Get final coefficient array.""" return self.getmin()
[docs] def xstep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{x}`.""" self.YU[:] = self.Y - self.U b = self.DSf + self.rho * sl.rfftn(self.YU, None, self.cri.axisN) if self.cri.Cd == 1: self.Xf[:] = sl.solvedbi_sm(self.Df, self.rho, b, self.c, self.cri.axisM) else: self.Xf[:] = sl.solvemdbi_ism(self.Df, self.rho, b, self.cri.axisM, self.cri.axisC) self.X = sl.irfftn(self.Xf, self.cri.Nv, self.cri.axisN) if self.opt['LinSolveCheck']: Dop = lambda x: sl.inner(self.Df, x, axis=self.cri.axisM) if self.cri.Cd == 1: DHop = lambda x: np.conj(self.Df) * x else: DHop = lambda x: sl.inner(np.conj(self.Df), x, axis=self.cri.axisC) ax = DHop(Dop(self.Xf)) + self.rho * self.Xf self.xrrs = sl.rrs(ax, b) else: self.xrrs = None
[docs] def ystep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{y}`. If this method is not overridden, the problem is solved without any regularisation other than the option enforcement of non-negativity of the solution and filter boundary crossing supression. When it is overridden, it should be explicitly called at the end of the overriding method. """ if self.opt['NonNegCoef']: self.Y[self.Y < 0.0] = 0.0 if self.opt['NoBndryCross']: for n in range(0, self.cri.dimN): self.Y[(slice(None),) * n + (slice(1 - self.D.shape[n], None),)] = 0.0
[docs] def obfn_fvarf(self): """Variable to be evaluated in computing data fidelity term, depending on ``fEvalX`` option value. """ return self.Xf if self.opt['fEvalX'] else \ sl.rfftn(self.Y, None, self.cri.axisN)
[docs] def eval_objfn(self): """Compute components of objective function as well as total contribution to objective function. """ dfd = self.obfn_dfd() reg = self.obfn_reg() obj = dfd + reg[0] return (obj, dfd) + reg[1:]
[docs] def obfn_dfd(self): r"""Compute data fidelity term :math:`(1/2) \| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2`. """ Ef = sl.inner(self.Df, self.obfn_fvarf(), axis=self.cri.axisM) - \ self.Sf return sl.rfl2norm2(Ef, self.S.shape, axis=self.cri.axisN) / 2.0
[docs] def obfn_reg(self): """Compute regularisation term(s) and contribution to objective function. """ raise NotImplementedError()
[docs] def itstat_extra(self): """Non-standard entries for the iteration stats record tuple.""" return (self.xrrs,)
[docs] def rhochange(self): """Updated cached c array when rho changes.""" if self.opt['HighMemSolve'] and self.cri.Cd == 1: self.c = sl.solvedbi_sm_c(self.Df, np.conj(self.Df), self.rho, self.cri.axisM)
[docs] def reconstruct(self, X=None): """Reconstruct representation.""" if X is None: X = self.Y Xf = sl.rfftn(X, None, self.cri.axisN) Sf = np.sum(self.Df * Xf, axis=self.cri.axisM) return sl.irfftn(Sf, self.cri.Nv, self.cri.axisN)
[docs]class ConvBPDN(GenericConvBPDN): r""" ADMM algorithm for the Convolutional BPDN (CBPDN) :cite:`wohlberg-2014-efficient` :cite:`wohlberg-2016-efficient` :cite:`wohlberg-2016-convolutional` problem. | .. inheritance-diagram:: ConvBPDN :parts: 2 | Solve the optimisation problem .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + \lambda \sum_m \| \mathbf{x}_m \|_1 for input image :math:`\mathbf{s}`, dictionary filters :math:`\mathbf{d}_m`, and coefficient maps :math:`\mathbf{x}_m`, via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x}, \mathbf{y}} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + \lambda \sum_m \| \mathbf{y}_m \|_1 \quad \text{such that} \quad \mathbf{x}_m = \mathbf{y}_m \;\;. Multi-image and multi-channel problems are also supported. The multi-image problem is .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \sum_k \left\| \sum_m \mathbf{d}_m * \mathbf{x}_{k,m} - \mathbf{s}_k \right\|_2^2 + \lambda \sum_k \sum_m \| \mathbf{x}_{k,m} \|_1 with input images :math:`\mathbf{s}_k` and coefficient maps :math:`\mathbf{x}_{k,m}`, and the multi-channel problem with input image channels :math:`\mathbf{s}_c` is either .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \sum_c \left\| \sum_m \mathbf{d}_m * \mathbf{x}_{c,m} - \mathbf{s}_c \right\|_2^2 + \lambda \sum_c \sum_m \| \mathbf{x}_{c,m} \|_1 with single-channel dictionary filters :math:`\mathbf{d}_m` and multi-channel coefficient maps :math:`\mathbf{x}_{c,m}`, or .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \sum_c \left\| \sum_m \mathbf{d}_{c,m} * \mathbf{x}_m - \mathbf{s}_c \right\|_2^2 + \lambda \sum_m \| \mathbf{x}_m \|_1 with multi-channel dictionary filters :math:`\mathbf{d}_{c,m}` and single-channel coefficient maps :math:`\mathbf{x}_m`. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value ``DFid`` : Value of data fidelity term :math:`(1/2) \| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2` ``RegL1`` : Value of regularisation term :math:`\sum_m \| \mathbf{x}_m \|_1` ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``XSlvRelRes`` : Relative residual of X step solver ``Time`` : Cumulative run time """
[docs] class Options(GenericConvBPDN.Options): r"""ConvBPDN algorithm options Options include all of those defined in :class:`.admm.ADMMEqual.Options`, together with additional options: ``L1Weight`` : An array of weights for the :math:`\ell_1` norm. The array shape must be such that the array is compatible for multiplication with the `X`/`Y` variables (see :func:`.cnvrep.l1Wshape` for more details). If this option is defined, the regularization term is :math:`\lambda \sum_m \| \mathbf{w}_m \odot \mathbf{x}_m \|_1` where :math:`\mathbf{w}_m` denotes slices of the weighting array on the filter index axis. """ defaults = copy.deepcopy(GenericConvBPDN.Options.defaults) defaults.update({'L1Weight': 1.0}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) ConvBPDN algorithm options """ if opt is None: opt = {} GenericConvBPDN.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1') hdrtxt_objfn = ('Fnc', 'DFid', u('Regℓ1')) hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid', u('Regℓ1'): 'RegL1'} def __init__(self, D, S, lmbda=None, opt=None, dimK=None, dimN=2): """ This class supports an arbitrary number of spatial dimensions, `dimN`, with a default of 2. The input dictionary `D` is either `dimN` + 1 dimensional, in which case each spatial component (image in the default case) is assumed to consist of a single channel, or `dimN` + 2 dimensional, in which case the final dimension is assumed to contain the channels (e.g. colour channels in the case of images). The input signal set `S` is either `dimN` dimensional (no channels, only one signal), `dimN` + 1 dimensional (either multiple channels or multiple signals), or `dimN` + 2 dimensional (multiple channels and multiple signals). Determination of problem dimensions is handled by :class:`.cnvrep.CSC_ConvRepIndexing`. | **Call graph** .. image:: ../_static/jonga/cbpdn_init.svg :width: 20% :target: ../_static/jonga/cbpdn_init.svg | Parameters ---------- D : array_like Dictionary array S : array_like Signal array lmbda : float Regularisation parameter opt : :class:`ConvBPDN.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial/temporal dimensions """ # Set default options if none specified if opt is None: opt = ConvBPDN.Options() # Set dtype attribute based on S.dtype and opt['DataType'] self.set_dtype(opt, S.dtype) # Set default lambda value if not specified if lmbda is None: cri = cr.CSC_ConvRepIndexing(D, S, dimK=dimK, dimN=dimN) Df = sl.rfftn(D.reshape(cri.shpD), cri.Nv, axes=cri.axisN) Sf = sl.rfftn(S.reshape(cri.shpS), axes=cri.axisN) b = np.conj(Df) * Sf lmbda = 0.1 * abs(b).max() # Set l1 term scaling self.lmbda = self.dtype.type(lmbda) # Set penalty parameter self.set_attr('rho', opt['rho'], dval=(50.0 * self.lmbda + 1.0), dtype=self.dtype) # Set rho_xi attribute (see Sec. VI.C of wohlberg-2015-adaptive) if self.lmbda != 0.0: rho_xi = float((1.0 + (18.3)**(np.log10(self.lmbda) + 1.0))) else: rho_xi = 1.0 self.set_attr('rho_xi', opt['AutoRho', 'RsdlTarget'], dval=rho_xi, dtype=self.dtype) # Call parent class __init__ super(ConvBPDN, self).__init__(D, S, opt, dimK, dimN) # Set l1 term weight array self.wl1 = np.asarray(opt['L1Weight'], dtype=self.dtype) self.wl1 = self.wl1.reshape(cr.l1Wshape(self.wl1, self.cri))
[docs] def uinit(self, ushape): """Return initialiser for working variable U""" if self.opt['Y0'] is None: return np.zeros(ushape, dtype=self.dtype) else: # If initial Y is non-zero, initial U is chosen so that # the relevant dual optimality criterion (see (3.10) in # boyd-2010-distributed) is satisfied. return (self.lmbda/self.rho)*np.sign(self.Y)
[docs] def ystep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{y}`.""" self.Y = sp.prox_l1(self.AX + self.U, (self.lmbda / self.rho) * self.wl1) super(ConvBPDN, self).ystep()
[docs] def obfn_reg(self): """Compute regularisation term and contribution to objective function. """ rl1 = np.linalg.norm((self.wl1 * self.obfn_gvar()).ravel(), 1) return (self.lmbda*rl1, rl1)
[docs]class ConvBPDNJoint(ConvBPDN): r""" ADMM algorithm for Convolutional BPDN with joint sparsity via an :math:`\ell_{2,1}` norm term :cite:`wohlberg-2016-convolutional` (the :math:`\ell_2` norms are computed over the channel index). | .. inheritance-diagram:: ConvBPDNJoint :parts: 2 | Solve the optimisation problem .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \sum_c \left\| \sum_m \mathbf{d}_m * \mathbf{x}_{c,m} - \mathbf{s}_c \right\|_2^2 + \lambda \sum_c \sum_m \| \mathbf{x}_{c,m} \|_1 + \mu \| \{ \mathbf{x}_{c,m} \} \|_{2,1} with input images :math:`\mathbf{s}_c`, dictionary filters :math:`\mathbf{d}_m`, and coefficient maps :math:`\mathbf{x}_{c,m}`, via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x}, \mathbf{y}} \; (1/2) \sum_c \left\| \sum_m \mathbf{d}_m * \mathbf{x}_{c,m} - \mathbf{s}_c \right\|_2^2 + \lambda \sum_c \sum_m \| \mathbf{y}_{c,m} \|_1 + \mu \| \{ \mathbf{y}_{c,m} \} \|_{2,1} \quad \text{such that} \quad \mathbf{x}_{c,m} = \mathbf{y}_{c,m} \;\;. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value ``DFid`` : Value of data fidelity term :math:`(1/2) \sum_c \left\| \sum_m \mathbf{d}_m * \mathbf{x}_{k,m} - \mathbf{s}_c \right\|_2^2` ``RegL1`` : Value of regularisation term :math:`\sum_c \sum_m \| \mathbf{x}_{c,m} \|_1` ``RegL21`` : Value of regularisation term :math:`\| \{ \mathbf{x}_{c,m} \} \|_{2,1}` ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual Residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``Time`` : Cumulative run time """
[docs] class Options(ConvBPDN.Options): r"""ConvBPDNJoint algorithm options Options include all of those defined in :class:`ConvBPDN.Options`, together with additional options: ``L21Weight`` : An array of weights for the :math:`\ell_{2,1}` norm. The array shape must be such that the array is compatible for multiplication with the X/Y variables *after* the sum over ``axisC`` performed during the computation of the :math:`\ell_{2,1}` norm. If this option is defined, the regularization term is :math:`\mu \sum_i w_i \sqrt{ \sum_c \mathbf{x}_{i,c}^2 }` where :math:`w_i` are the elements of the weight array, subscript :math:`c` indexes the channel axis and subscript :math:`i` indexes all other axes. """ defaults = copy.deepcopy(ConvBPDN.Options.defaults) defaults.update({'L21Weight': 1.0}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) ConvBPDNJoint algorithm options """ if opt is None: opt = {} ConvBPDN.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1', 'RegL21') hdrtxt_objfn = ('Fnc', 'DFid', u('Regℓ1'), u('Regℓ2,1')) hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid', u('Regℓ1'): 'RegL1', u('Regℓ2,1'): 'RegL21'} def __init__(self, D, S, lmbda=None, mu=0.0, opt=None, dimK=None, dimN=2): """ | **Call graph** .. image:: ../_static/jonga/cbpdnjnt_init.svg :width: 20% :target: ../_static/jonga/cbpdnjnt_init.svg | Parameters ---------- D : array_like Dictionary array S : array_like Signal array lmbda : float Regularisation parameter (l1) mu : float Regularisation parameter (l2,1) opt : :class:`ConvBPDNJoint.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial dimensions """ if opt is None: opt = ConvBPDN.Options() super(ConvBPDNJoint, self).__init__(D, S, lmbda, opt, dimK=dimK, dimN=dimN) self.mu = self.dtype.type(mu) self.wl21 = np.asarray(opt['L21Weight'], dtype=self.dtype)
[docs] def ystep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{y}`. """ self.Y = sp.prox_l1l2(self.AX + self.U, (self.lmbda/self.rho)*self.wl1, (self.mu/self.rho)*self.wl21, axis=self.cri.axisC) GenericConvBPDN.ystep(self)
[docs] def obfn_reg(self): r"""Compute regularisation terms and contribution to objective function. Regularisation terms are :math:`\| Y \|_1` and :math:`\| Y \|_{2,1}`. """ rl1 = np.linalg.norm((self.wl1 * self.obfn_gvar()).ravel(), 1) rl21 = np.sum(self.wl21 * np.sqrt(np.sum(self.obfn_gvar()**2, axis=self.cri.axisC))) return (self.lmbda*rl1 + self.mu*rl21, rl1, rl21)
[docs]class ConvElasticNet(ConvBPDN): r""" ADMM algorithm for a convolutional form of the elastic net problem :cite:`zou-2005-regularization`. | .. inheritance-diagram:: ConvElasticNet :parts: 2 | Solve the optimisation problem .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + \lambda \sum_m \| \mathbf{x}_m \|_1 + (\mu/2) \sum_m \| \mathbf{x}_m \|_2^2 via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x}, \mathbf{y}} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + \lambda \sum_m \| \mathbf{y}_m \|_1 + (\mu/2) \sum_m \| \mathbf{x}_m \|_2^2 \quad \text{such that} \quad \mathbf{x}_m = \mathbf{y}_m \;\;. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value ``DFid`` : Value of data fidelity term :math:`(1/2) \| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2` ``RegL1`` : Value of regularisation term :math:`\sum_m \| \mathbf{x}_m \|_1` ``RegL2`` : Value of regularisation term :math:`(1/2) \sum_m \| \mathbf{x}_m \|_2^2` ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``XSlvRelRes`` : Relative residual of X step solver ``Time`` : Cumulative run time """ itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1', 'RegL2') hdrtxt_objfn = ('Fnc', 'DFid', u('Regℓ1'), u('Regℓ2')) hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid', u('Regℓ1'): 'RegL1', u('Regℓ2'): 'RegL2'} def __init__(self, D, S, lmbda=None, mu=0.0, opt=None, dimK=None, dimN=2): """ | **Call graph** .. image:: ../_static/jonga/celnet_init.svg :width: 20% :target: ../_static/jonga/celnet_init.svg | Parameters ---------- D : array_like Dictionary matrix S : array_like Signal vector or matrix lmbda : float Regularisation parameter (l1) mu : float Regularisation parameter (l2) opt : :class:`ConvBPDN.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial dimensions """ if opt is None: opt = ConvBPDN.Options() # Set dtype attribute based on S.dtype and opt['DataType'] self.set_dtype(opt, S.dtype) self.mu = self.dtype.type(mu) super(ConvElasticNet, self).__init__(D, S, lmbda, opt, dimK=dimK, dimN=dimN)
[docs] def setdict(self, D=None): """Set dictionary array.""" if D is not None: self.D = np.asarray(D, dtype=self.dtype) self.Df = sl.rfftn(self.D, self.cri.Nv, self.cri.axisN) # Compute D^H S self.DSf = np.conj(self.Df) * self.Sf if self.cri.Cd > 1: self.DSf = np.sum(self.DSf, axis=self.cri.axisC, keepdims=True) if self.opt['HighMemSolve'] and self.cri.Cd == 1: self.c = sl.solvedbi_sm_c(self.Df, np.conj(self.Df), self.mu + self.rho, self.cri.axisM) else: self.c = None
[docs] def xstep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{x}`. """ self.YU[:] = self.Y - self.U b = self.DSf + self.rho*sl.rfftn(self.YU, None, self.cri.axisN) if self.cri.Cd == 1: self.Xf[:] = sl.solvedbi_sm(self.Df, self.mu + self.rho, b, self.c, self.cri.axisM) else: self.Xf[:] = sl.solvemdbi_ism(self.Df, self.mu + self.rho, b, self.cri.axisM, self.cri.axisC) self.X = sl.irfftn(self.Xf, self.cri.Nv, self.cri.axisN) if self.opt['LinSolveCheck']: Dop = lambda x: sl.inner(self.Df, x, axis=self.cri.axisM) if self.cri.Cd == 1: DHop = lambda x: np.conj(self.Df) * x else: DHop = lambda x: sl.inner(np.conj(self.Df), x, axis=self.cri.axisC) ax = DHop(Dop(self.Xf)) + (self.mu + self.rho)*self.Xf self.xrrs = sl.rrs(ax, b) else: self.xrrs = None
[docs] def obfn_reg(self): """Compute regularisation term and contribution to objective function. """ rl1 = np.linalg.norm((self.wl1 * self.obfn_gvar()).ravel(), 1) rl2 = 0.5*np.linalg.norm(self.obfn_gvar())**2 return (self.lmbda*rl1 + self.mu*rl2, rl1, rl2)
[docs]class ConvBPDNGradReg(ConvBPDN): r""" ADMM algorithm for an extension of Convolutional BPDN including a term penalising the gradient of the coefficient maps :cite:`wohlberg-2016-convolutional2`. | .. inheritance-diagram:: ConvBPDNGradReg :parts: 2 | Solve the optimisation problem .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + \lambda \sum_m \| \mathbf{x}_m \|_1 + (\mu/2) \sum_i \sum_m \| G_i \mathbf{x}_m \|_2^2 \;\;, where :math:`G_i` is an operator computing the derivative along index :math:`i`, via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x},\mathbf{y}} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + \lambda \sum_m \| \mathbf{y}_m \|_1 + (\mu/2) \sum_i \sum_m \| G_i \mathbf{x}_m \|_2^2 \quad \text{such that} \quad \mathbf{x}_m = \mathbf{y}_m \;\;. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value ``DFid`` : Value of data fidelity term :math:`(1/2) \| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \|_2^2` ``RegL1`` : Value of regularisation term :math:`\sum_m \| \mathbf{x}_m \|_1` ``RegGrad`` : Value of regularisation term :math:`(1/2) \sum_i \sum_m \| G_i \mathbf{x}_m \|_2^2` ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``XSlvRelRes`` : Relative residual of X step solver ``Time`` : Cumulative run time """
[docs] class Options(ConvBPDN.Options): r"""ConvBPDNGradReg algorithm options Options include all of those defined in :class:`ConvBPDN.Options`, together with additional options: ``GradWeight`` : An array of weights :math:`w_m` for the term penalising the gradient of the coefficient maps. If this option is defined, the gradient regularization term is :math:`\sum_i \sum_m w_m \| G_i \mathbf{x}_m \|_2^2` where :math:`w_m` is the weight for filter index :math:`m`. The array should be an :math:`M`-vector where :math:`M` is the number of filters in the dictionary. """ defaults = copy.deepcopy(ConvBPDN.Options.defaults) defaults.update({'GradWeight': 1.0}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) ConvBPDNGradReg algorithm options """ if opt is None: opt = {} ConvBPDN.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1', 'RegGrad') hdrtxt_objfn = ('Fnc', 'DFid', u('Regℓ1'), u('Regℓ2∇')) hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid', u('Regℓ1'): 'RegL1', u('Regℓ2∇'): 'RegGrad'} def __init__(self, D, S, lmbda=None, mu=0.0, opt=None, dimK=None, dimN=2): """ | **Call graph** .. image:: ../_static/jonga/cbpdngrd_init.svg :width: 20% :target: ../_static/jonga/cbpdngrd_init.svg | Parameters ---------- D : array_like Dictionary matrix S : array_like Signal vector or matrix lmbda : float Regularisation parameter (l1) mu : float Regularisation parameter (gradient) opt : :class:`ConvBPDNGradReg.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial dimensions """ if opt is None: opt = ConvBPDNGradReg.Options() self.cri = cr.CSC_ConvRepIndexing(D, S, dimK=dimK, dimN=dimN) # Set dtype attribute based on S.dtype and opt['DataType'] self.set_dtype(opt, S.dtype) self.mu = self.dtype.type(mu) if hasattr(opt['GradWeight'], 'ndim'): self.Wgrd = np.asarray(opt['GradWeight'].reshape((1,)*(dimN+2) + opt['GradWeight'].shape), dtype=self.dtype) else: self.Wgrd = np.asarray(opt['GradWeight'], dtype=self.dtype) self.Gf, GHGf = sl.GradientFilters(self.cri.dimN+3, self.cri.axisN, self.cri.Nv, dtype=self.dtype) self.GHGf = self.Wgrd * GHGf super(ConvBPDNGradReg, self).__init__(D, S, lmbda, opt, dimK=dimK, dimN=dimN)
[docs] def setdict(self, D=None): """Set dictionary array.""" if D is not None: self.D = np.asarray(D, dtype=self.dtype) self.Df = sl.rfftn(self.D, self.cri.Nv, self.cri.axisN) # Compute D^H S self.DSf = np.conj(self.Df) * self.Sf if self.cri.Cd > 1: self.DSf = np.sum(self.DSf, axis=self.cri.axisC, keepdims=True) if self.opt['HighMemSolve'] and self.cri.Cd == 1: self.c = sl.solvedbd_sm_c( self.Df, np.conj(self.Df), self.mu*self.GHGf + self.rho, self.cri.axisM) else: self.c = None
[docs] def xstep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{x}`. """ self.YU[:] = self.Y - self.U b = self.DSf + self.rho*sl.rfftn(self.YU, None, self.cri.axisN) if self.cri.Cd == 1: self.Xf[:] = sl.solvedbd_sm(self.Df, self.mu*self.GHGf + self.rho, b, self.c, self.cri.axisM) else: self.Xf[:] = sl.solvemdbi_ism(self.Df, self.mu*self.GHGf + self.rho, b, self.cri.axisM, self.cri.axisC) self.X = sl.irfftn(self.Xf, self.cri.Nv, self.cri.axisN) if self.opt['LinSolveCheck']: Dop = lambda x: sl.inner(self.Df, x, axis=self.cri.axisM) if self.cri.Cd == 1: DHop = lambda x: np.conj(self.Df) * x else: DHop = lambda x: sl.inner(np.conj(self.Df), x, axis=self.cri.axisC) ax = DHop(Dop(self.Xf)) + (self.mu*self.GHGf + self.rho)*self.Xf self.xrrs = sl.rrs(ax, b) else: self.xrrs = None
[docs] def obfn_reg(self): """Compute regularisation term and contribution to objective function. """ fvf = self.obfn_fvarf() rl1 = np.linalg.norm((self.wl1 * self.obfn_gvar()).ravel(), 1) rgr = sl.rfl2norm2(np.sqrt(self.GHGf*np.conj(fvf)*fvf), self.cri.Nv, self.cri.axisN)/2.0 return (self.lmbda*rl1 + self.mu*rgr, rl1, rgr)
[docs]class ConvBPDNProjL1(GenericConvBPDN): r""" ADMM algorithm for a ConvBPDN variant with projection onto the :math:`\ell_1` ball instead of an :math:`\ell_1` penalty. | .. inheritance-diagram:: ConvBPDNProjL1 :parts: 2 | Solve the optimisation problem .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 \; \text{such that} \; \sum_m \| \mathbf{x}_m \|_1 \leq \gamma via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x}, \mathbf{y}} \; (1/2) \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2^2 + \iota_{C(\gamma)} (\{\mathbf{y}_m\}) \quad \text{such that} \quad \mathbf{x}_m = \mathbf{y}_m \;\;, where :math:`\iota_{C(\gamma)}(\cdot)` is the indicator function of the :math:`\ell_1` ball of radius :math:`\gamma` about the origin. The algorithm is very similar to that for the CBPDN problem (see :class:`ConvBPDN`), the only difference being in the replacement in the :math:`\mathbf{y}` step of the proximal operator of the :math:`\ell_1` norm with the projection operator of the :math:`\ell_1` norm. In particular, the :math:`\mathbf{x}` step uses the solver from :cite:`wohlberg-2014-efficient` for single-channel dictionaries, and the solver from :cite:`wohlberg-2016-convolutional` for multi-channel dictionaries. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value ``Cnstr`` : Constraint violation measure ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``XSlvRelRes`` : Relative residual of X step solver ``Time`` : Cumulative run time """
[docs] class Options(GenericConvBPDN.Options): """ConvBPDNProjL1 algorithm options Options are the same as those defined in :class:`.GenericConvBPDN.Options`. """ defaults = copy.deepcopy(GenericConvBPDN.Options.defaults) defaults['AutoRho'].update({'RsdlTarget': 1.0}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) ConvBPDNProjL1 algorithm options """ if opt is None: opt = {} GenericConvBPDN.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'Cnstr') hdrtxt_objfn = ('Fnc', 'Cnstr') hdrval_objfun = {'Fnc': 'ObjFun', 'Cnstr': 'Cnstr'} def __init__(self, D, S, gamma, opt=None, dimK=None, dimN=2): """ | **Call graph** .. image:: ../_static/jonga/cbpdnprjl1_init.svg :width: 20% :target: ../_static/jonga/cbpdnprjl1_init.svg | Parameters ---------- D : array_like Dictionary matrix S : array_like Signal vector or matrix gamma : float Constraint parameter opt : :class:`ConvBPDNProjL1.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial dimensions """ # Set default options if necessary if opt is None: opt = ConvBPDNProjL1.Options() super(ConvBPDNProjL1, self).__init__(D, S, opt, dimK=dimK, dimN=dimN) self.gamma = self.dtype.type(gamma)
[docs] def uinit(self, ushape): """Return initialiser for working variable U.""" if self.opt['Y0'] is None: return np.zeros(ushape, dtype=self.dtype) else: # If initial Y is non-zero, initial U is chosen so that # the relevant dual optimality criterion (see (3.10) in # boyd-2010-distributed) is satisfied. # NB: still needs to be worked out. return np.zeros(ushape, dtype=self.dtype)
[docs] def ystep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{y}`. """ self.Y = sp.proj_l1(self.AX + self.U, self.gamma, axis=self.cri.axisN + (self.cri.axisC, self.cri.axisM)) super(ConvBPDNProjL1, self).ystep()
[docs] def eval_objfn(self): """Compute components of regularisation function as well as total objective function. """ dfd = self.obfn_dfd() prj = sp.proj_l1(self.obfn_gvar(), self.gamma, axis=self.cri.axisN + (self.cri.axisC, self.cri.axisM)) cns = np.linalg.norm(prj - self.obfn_gvar()) return (dfd, cns)
[docs]class ConvTwoBlockCnstrnt(admm.ADMMTwoBlockCnstrnt): r""" Base class for ADMM algorithms for problems of the form .. math:: \mathrm{argmin}_\mathbf{x} \; g_0(D \mathbf{x} - \mathbf{s}) + g_1(\mathbf{x}) \;\;, where :math:`D \mathbf{x} = \sum_m \mathbf{d}_m * \mathbf{x}_m`. | .. inheritance-diagram:: ConvTwoBlockCnstrnt :parts: 2 | The problem is solved via an ADMM problem of the form .. math:: \mathrm{argmin}_{\mathbf{x},\mathbf{y}_0,\mathbf{y}_1} \; g_0(\mathbf{y}_0) + g_1(\mathbf{y}_1) \;\text{such that}\; \left( \begin{array}{c} D \\ I \end{array} \right) \mathbf{x} - \left( \begin{array}{c} \mathbf{y}_0 \\ \mathbf{y}_1 \end{array} \right) = \left( \begin{array}{c} \mathbf{s} \\ \mathbf{0} \end{array} \right) \;\;. In this case the ADMM constraint is :math:`A\mathbf{x} + B\mathbf{y} = \mathbf{c}` where .. math:: A = \left( \begin{array}{c} D \\ I \end{array} \right) \qquad B = -I \qquad \mathbf{y} = \left( \begin{array}{c} \mathbf{y}_0 \\ \mathbf{y}_1 \end{array} \right) \qquad \mathbf{c} = \left( \begin{array}{c} \mathbf{s} \\ \mathbf{0} \end{array} \right) \;\;. | The implementation of this class is substantially complicated by the support of multi-channel signals. In the following, the number of channels in the signal and dictionary are denoted by ``C`` and ``Cd`` respectively, the number of signals and the number of filters are denoted by ``K`` and ``M`` respectively, ``D``, ``X``, and ``S`` denote the dictionary, coefficient map, and signal arrays respectively, and ``Y0`` and ``Y1`` denote blocks 0 and 1 of the auxiliary (split) variable of the ADMM problem. We need to consider three different cases: 1. Single channel signal and dictionary (``C`` = ``Cd`` = 1) 2. Multi-channel signal, single channel dictionary (``C`` > 1, ``Cd`` = 1) 3. Multi-channel signal and dictionary (``C`` = ``Cd`` > 1) The final three (non-spatial) dimensions of the main variables in each of these cases are as in the following table: ====== ================== ===================== ================== Var. ``C`` = ``Cd`` = 1 ``C`` > 1, ``Cd`` = 1 ``C`` = ``Cd`` > 1 ====== ================== ===================== ================== ``D`` 1 x 1 x ``M`` 1 x 1 x ``M`` ``Cd`` x 1 x ``M`` ``X`` 1 x ``K`` x ``M`` ``C`` x ``K`` x ``M`` 1 x ``K`` x ``M`` ``S`` 1 x ``K`` x 1 ``C`` x ``K`` x 1 ``C`` x ``K`` x 1 ``Y0`` 1 x ``K`` x 1 ``C`` x ``K`` x 1 ``C`` x ``K`` x 1 ``Y1`` 1 x ``K`` x ``M`` ``C`` x ``K`` x ``M`` 1 x ``K`` x ``M`` ====== ================== ===================== ================== In order to combine the block components ``Y0`` and ``Y1`` of variable ``Y`` into a single array, we need to be able to concatenate the two component arrays on one of the axes. The final ``M`` axis is suitable in the first two cases, but it is not possible to concatenate ``Y0`` and ``Y1`` on the final axis in case 3. The solution is that, in case 3, the the ``C`` and ``M`` axes of ``Y0`` are swapped before concatenating, as well as after extracting the ``Y0`` component from the concatenated ``Y`` variable (see :meth:`.block_sep0` and :meth:`block_cat`). | This class specialises class :class:`.ADMMTwoBlockCnstrnt`, but remains a base class for other classes that specialise to specific optimisation problems. """
[docs] class Options(admm.ADMMTwoBlockCnstrnt.Options): """ConvTwoBlockCnstrnt algorithm options Options include all of those defined in :class:`.ADMMTwoBlockCnstrnt.Options`, together with additional options: ``LinSolveCheck`` : Flag indicating whether to compute relative residual of X step solver. ``HighMemSolve`` : Flag indicating whether to use a slightly faster algorithm at the expense of higher memory usage. ``NonNegCoef`` : Flag indicating whether to force solution to be non-negative. ``NoBndryCross`` : Flag indicating whether all solution coefficients corresponding to filters crossing the image boundary should be forced to zero. """ defaults = copy.deepcopy(admm.ADMMEqual.Options.defaults) defaults.update({'AuxVarObj': False, 'fEvalX': True, 'gEvalY': False, 'HighMemSolve': False, 'LinSolveCheck': False, 'NonNegCoef': False, 'NoBndryCross': False, 'RelaxParam': 1.8, 'rho': 1.0, 'ReturnVar': 'Y1'}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) ConvTwoBlockCnstrnt algorithm options """ if opt is None: opt = {} admm.ADMMTwoBlockCnstrnt.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'G0Val', 'G1Val') itstat_fields_extra = ('XSlvRelRes',) hdrtxt_objfn = ('Fnc', 'g0', 'g1') hdrval_objfun = {'Fnc': 'ObjFun', 'g0': 'G0Val', 'g1': 'G1Val'} def __init__(self, D, S, opt=None, dimK=None, dimN=2): """ Parameters ---------- D : array_like Dictionary array S : array_like Signal array opt : :class:`ConvTwoBlockCnstrnt.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial dimensions """ # Infer problem dimensions and set relevant attributes of self self.cri = cr.CSC_ConvRepIndexing(D, S, dimK=dimK, dimN=dimN) # Determine whether axis swapping on Y block 0 is necessary self.y0swapaxes = bool(self.cri.C > 1 and self.cri.Cd > 1) # Call parent class __init__ Nx = self.cri.M * self.cri.N * self.cri.K shpY = list(self.cri.shpX) if self.y0swapaxes: shpY[self.cri.axisC] = 1 shpY[self.cri.axisM] += self.cri.Cd super(ConvTwoBlockCnstrnt, self).__init__(Nx, shpY, self.cri.axisM, self.cri.Cd, S.dtype, opt) # Reshape D and S to standard layout self.D = np.asarray(D.reshape(self.cri.shpD), dtype=self.dtype) self.S = np.asarray(S.reshape(self.cri.shpS), dtype=self.dtype) # Initialise byte-aligned arrays for pyfftw self.YU = sl.pyfftw_empty_aligned(self.Y.shape, dtype=self.dtype) self.Xf = sl.pyfftw_rfftn_empty_aligned(self.cri.shpX, self.cri.axisN, self.dtype) self.setdict()
[docs] def setdict(self, D=None): """Set dictionary array.""" if D is not None: self.D = np.asarray(D, dtype=self.dtype) self.Df = sl.rfftn(self.D, self.cri.Nv, self.cri.axisN) if self.opt['HighMemSolve'] and self.cri.Cd == 1: self.c = sl.solvedbi_sm_c(self.Df, np.conj(self.Df), 1.0, self.cri.axisM) else: self.c = None
[docs] def getcoef(self): """Get final coefficient array.""" return self.getmin()
[docs] def xstep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{x}`. """ self.YU[:] = self.Y - self.U self.block_sep0(self.YU)[:] += self.S YUf = sl.rfftn(self.YU, None, self.cri.axisN) if self.cri.Cd == 1: b = np.conj(self.Df) * self.block_sep0(YUf) + self.block_sep1(YUf) else: b = sl.inner(np.conj(self.Df), self.block_sep0(YUf), axis=self.cri.axisC) + self.block_sep1(YUf) if self.cri.Cd == 1: self.Xf[:] = sl.solvedbi_sm(self.Df, 1.0, b, self.c, self.cri.axisM) else: self.Xf[:] = sl.solvemdbi_ism(self.Df, 1.0, b, self.cri.axisM, self.cri.axisC) self.X = sl.irfftn(self.Xf, self.cri.Nv, self.cri.axisN) if self.opt['LinSolveCheck']: Dop = lambda x: sl.inner(self.Df, x, axis=self.cri.axisM) if self.cri.Cd == 1: DHop = lambda x: np.conj(self.Df) * x else: DHop = lambda x: sl.inner(np.conj(self.Df), x, axis=self.cri.axisC) ax = DHop(Dop(self.Xf)) + self.Xf self.xrrs = sl.rrs(ax, b) else: self.xrrs = None
[docs] def ystep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{y}`. """ if self.opt['NonNegCoef'] or self.opt['NoBndryCross']: Y1 = self.block_sep1(self.Y) if self.opt['NonNegCoef']: Y1[Y1 < 0.0] = 0.0 if self.opt['NoBndryCross']: for n in range(0, self.cri.dimN): Y1[(slice(None),)*n + (slice(1-self.D.shape[n], None),)] = 0.0 self.block_sep1(self.Y)[:] = Y1
[docs] def relax_AX(self): """Implement relaxation if option ``RelaxParam`` != 1.0.""" self.AXnr = self.cnst_A(self.X, self.Xf) if self.rlx == 1.0: self.AX = self.AXnr else: if not hasattr(self, 'c0'): self.c0 = self.cnst_c0() if not hasattr(self, 'c1'): self.c1 = self.cnst_c1() alpha = self.rlx self.AX = alpha*self.AXnr + (1-alpha)*self.block_cat( self.var_y0() + self.c0, self.var_y1() + self.c1)
[docs] def block_sep0(self, Y): r"""Separate variable into component corresponding to :math:`\mathbf{y}_0` in :math:`\mathbf{y}\;\;`. The method from parent class :class:`.ADMMTwoBlockCnstrnt` is overridden here to allow swapping of C (channel) and M (filter) axes in block 0 so that it can be concatenated on axis M with block 1. This is necessary because block 0 has the dimensions of S (N x C x K x 1) while block 1 has the dimensions of X (N x 1 x K x M). """ if self.y0swapaxes: return np.swapaxes(Y[(slice(None),)*self.blkaxis + (slice(0, self.blkidx),)], self.cri.axisC, self.cri.axisM) else: return super(ConvTwoBlockCnstrnt, self).block_sep0(Y)
[docs] def block_cat(self, Y0, Y1): r"""Concatenate components corresponding to :math:`\mathbf{y}_0` and :math:`\mathbf{y}_1` to form :math:`\mathbf{y}\;\;`. The method from parent class :class:`.ADMMTwoBlockCnstrnt` is overridden here to allow swapping of C (channel) and M (filter) axes in block 0 so that it can be concatenated on axis M with block 1. This is necessary because block 0 has the dimensions of S (N x C x K x 1) while block 1 has the dimensions of X (N x 1 x K x M). """ if self.y0swapaxes: return np.concatenate((np.swapaxes(Y0, self.cri.axisC, self.cri.axisM), Y1), axis=self.blkaxis) else: return super(ConvTwoBlockCnstrnt, self).block_cat(Y0, Y1)
[docs] def cnst_A(self, X, Xf=None): r"""Compute :math:`A \mathbf{x}` component of ADMM problem constraint. """ return self.block_cat(self.cnst_A0(X, Xf), self.cnst_A1(X))
[docs] def obfn_g0var(self): """Variable to be evaluated in computing :meth:`.ADMMTwoBlockCnstrnt.obfn_g0`, depending on the ``AuxVarObj`` option value. """ return self.var_y0() if self.opt['AuxVarObj'] else \ self.cnst_A0(None, self.Xf) - self.cnst_c0()
[docs] def cnst_A0(self, X, Xf=None): r"""Compute :math:`A_0 \mathbf{x}` component of ADMM problem constraint. """ # This calculation involves non-negligible computational cost # when Xf is None (i.e. the function is not being applied to # self.X). if Xf is None: Xf = sl.rfftn(X, None, self.cri.axisN) return sl.irfftn(sl.inner(self.Df, Xf, axis=self.cri.axisM), self.cri.Nv, self.cri.axisN)
[docs] def cnst_A0T(self, Y0): r"""Compute :math:`A_0^T \mathbf{y}_0` component of :math:`A^T \mathbf{y}` (see :meth:`.ADMMTwoBlockCnstrnt.cnst_AT`). """ # This calculation involves non-negligible computational cost. It # should be possible to disable relevant diagnostic information # (dual residual) to avoid this cost. Y0f = sl.rfftn(Y0, None, self.cri.axisN) if self.cri.Cd == 1: return sl.irfftn(np.conj(self.Df) * Y0f, self.cri.Nv, self.cri.axisN) else: return sl.irfftn(sl.inner( np.conj(self.Df), Y0f, axis=self.cri.axisC), self.cri.Nv, self.cri.axisN)
[docs] def cnst_c0(self): r"""Compute constant component :math:`\mathbf{c}_0` of :math:`\mathbf{c}` in the ADMM problem constraint. """ return self.S
[docs] def eval_objfn(self): """Compute components of regularisation function as well as total contribution to objective function. """ g0v = self.obfn_g0(self.obfn_g0var()) g1v = self.obfn_g1(self.obfn_g1var()) obj = g0v + g1v return (obj, g0v, g1v)
[docs] def itstat_extra(self): """Non-standard entries for the iteration stats record tuple.""" return (self.xrrs,)
[docs] def reconstruct(self, X=None): """Reconstruct representation.""" if X is None: Xf = self.Xf else: Xf = sl.rfftn(X, None, self.cri.axisN) Sf = np.sum(self.Df * Xf, axis=self.cri.axisM) return sl.irfftn(Sf, self.cri.Nv, self.cri.axisN)
[docs] def rsdl_s(self, Yprev, Y): """Compute dual residual vector.""" return self.rho * self.cnst_AT(self.U)
[docs] def rsdl_sn(self, U): """Compute dual residual normalisation term.""" return self.rho * np.linalg.norm(U)
[docs]class ConvMinL1InL2Ball(ConvTwoBlockCnstrnt): r""" ADMM algorithm for the problem with an :math:`\ell_1` objective and an :math:`\ell_2` constraint, following the general approach proposed in :cite:`afonso-2011-augmented`. | .. inheritance-diagram:: ConvMinL1InL2Ball :parts: 2 | The :math:`\mathbf{y}` step is essentially the same as that of :class:`.admm.bpdn.MinL1InL2Ball` (with the trivial difference of a swap between the roles of :math:`\mathbf{y}_0` and :math:`\mathbf{y}_1`). The :math:`\mathbf{x}` step uses the solver from :cite:`wohlberg-2014-efficient` for single-channel dictionaries, and the solver from :cite:`wohlberg-2016-convolutional` for multi-channel dictionaries. Solve the Single Measurement Vector (SMV) problem .. math:: \mathrm{argmin}_\mathbf{x} \sum_m \| \mathbf{x}_m \|_1 \; \text{such that} \; \left\| \sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s} \right\|_2 \leq \epsilon via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x},\mathbf{y}_0,\mathbf{y}_1} \; \| \mathbf{y}_1 \|_1 + \iota_{C(\epsilon)}(\mathbf{y}_0) \;\text{such that}\; \left( \begin{array}{c} D \\ I \end{array} \right) \mathbf{x} - \left( \begin{array}{c} \mathbf{y}_0 \\ \mathbf{y}_1 \end{array} \right) = \left( \begin{array}{c} \mathbf{s} \\ \mathbf{0} \end{array} \right) \;\;, where :math:`\iota_{C(\epsilon)}(\cdot)` is the indicator function of the :math:`\ell_2` ball of radius :math:`\epsilon` about the origin, and :math:`D \mathbf{x} = \sum_m \mathbf{d}_m * \mathbf{x}_m`. The Multiple Measurement Vector (MMV) problem .. math:: \mathrm{argmin}_\mathbf{x} \sum_k \sum_m \| \mathbf{x}_{k,m} \|_1 \; \text{such that} \; \left\| \sum_m \mathbf{d}_m * \mathbf{x}_{k,m} - \mathbf{s}_k \right\|_2 \leq \epsilon \;\;\; \forall k \;\;, is also supported. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value :math:`\| \mathbf{x} \|_1` ``Cnstr`` : Constraint violation measure ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``Time`` : Cumulative run time """
[docs] class Options(ConvTwoBlockCnstrnt.Options): r"""ConvMinL1InL2Ball algorithm options Options include all of those defined in :class:`ConvTwoBlockCnstrnt.Options`, together with additional options: ``L1Weight`` : An array of weights for the :math:`\ell_1` norm. The array shape must be such that the array is compatible for multiplication with the `X`/`Y` variables (see :func:`.cnvrep.l1Wshape` for more details). If this option is defined, the objective function is :math:`\lambda \| \mathbf{w} \odot \mathbf{x} \|_1` where :math:`\mathbf{w}` denotes the weighting array. ``NonNegCoef`` : If ``True``, force solution to be non-negative. """ defaults = copy.deepcopy(ConvTwoBlockCnstrnt.Options.defaults) defaults.update({'AuxVarObj': False, 'fEvalX': True, 'gEvalY': False, 'RelaxParam': 1.8, 'L1Weight': 1.0, 'NonNegCoef': False, 'ReturnVar': 'Y1'}) defaults['AutoRho'].update({'Enabled': True, 'Period': 10, 'AutoScaling': True, 'Scaling': 1000.0, 'RsdlRatio': 1.2, 'RsdlTarget': 1.0}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) ConvMinL1InL2Ball algorithm options """ if opt is None: opt = {} ConvTwoBlockCnstrnt.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'Cnstr') hdrtxt_objfn = ('Fnc', 'Cnstr') hdrval_objfun = {'Fnc': 'ObjFun', 'Cnstr': 'Cnstr'} def __init__(self, D, S, epsilon, opt=None, dimK=None, dimN=2): r""" | **Call graph** .. image:: ../_static/jonga/cbpdnml1l2_init.svg :width: 20% :target: ../_static/jonga/cbpdnml1l2_init.svg | Parameters ---------- D : array_like Dictionary matrix S : array_like Signal vector or matrix epsilon : float :math:`\ell_2` ball radius opt : :class:`ConvMinL1InL2Ball.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial dimensions """ if opt is None: opt = ConvMinL1InL2Ball.Options() self.S = S super(ConvMinL1InL2Ball, self).__init__(D, S, opt, dimK=dimK, dimN=dimN) # Set l1 term weight array self.wl1 = np.asarray(opt['L1Weight'], dtype=self.dtype) self.wl1 = self.wl1.reshape(cr.l1Wshape(self.wl1, self.cri)) # Record epsilon value self.epsilon = self.dtype.type(epsilon)
[docs] def uinit(self, ushape): """Return initialiser for working variable U.""" if self.opt['Y0'] is None: return np.zeros(ushape, dtype=self.dtype) else: # If initial Y is non-zero, initial U is chosen so that # the relevant dual optimality criterion (see (3.10) in # boyd-2010-distributed) is satisfied. U0 = np.sign(self.block_sep0(self.Y)) / self.rho U1 = self.block_sep1(self.Y) - sl.atleast_nd(self.cri.dimN+3, self.S) return self.block_cat(U0, U1)
[docs] def ystep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{y}`. """ AXU = self.AX + self.U Y0 = sp.proj_l2(self.block_sep0(AXU) - self.S, self.epsilon, axis=self.cri.axisN) Y1 = sp.prox_l1(self.block_sep1(AXU), self.wl1 / self.rho) self.Y = self.block_cat(Y0, Y1) super(ConvMinL1InL2Ball, self).ystep()
[docs] def obfn_g0(self, Y0): r"""Compute :math:`g_0(\mathbf{y}_0)` component of ADMM objective function. """ return np.linalg.norm(sp.proj_l2(Y0, self.epsilon, axis=self.cri.axisN) - Y0)
[docs] def obfn_g1(self, Y1): r"""Compute :math:`g_1(\mathbf{y_1})` component of ADMM objective function. """ return np.linalg.norm((self.wl1 * Y1).ravel(), 1)
[docs] def eval_objfn(self): """Compute components of regularisation function as well as total contribution to objective function. """ g0v = self.obfn_g0(self.obfn_g0var()) g1v = self.obfn_g1(self.obfn_g1var()) return (g1v, g0v)
[docs]class ConvBPDNMaskDcpl(ConvTwoBlockCnstrnt): r""" ADMM algorithm for Convolutional BPDN with Mask Decoupling :cite:`heide-2015-fast`. | .. inheritance-diagram:: ConvBPDNMaskDcpl :parts: 2 | Solve the optimisation problem .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \left\| W \left(\sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s}\right) \right\|_2^2 + \lambda \sum_m \| \mathbf{x}_m \|_1 \;\;, where :math:`W` is a mask array, via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x},\mathbf{y}_0,\mathbf{y}_1} \; (1/2) \| W \mathbf{y}_0 \|_2^2 + \lambda \| \mathbf{y}_1 \|_1 \;\text{such that}\; \left( \begin{array}{c} D \\ I \end{array} \right) \mathbf{x} - \left( \begin{array}{c} \mathbf{y}_0 \\ \mathbf{y}_1 \end{array} \right) = \left( \begin{array}{c} \mathbf{s} \\ \mathbf{0} \end{array} \right) \;\;, where :math:`D \mathbf{x} = \sum_m \mathbf{d}_m * \mathbf{x}_m`. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value ``DFid`` : Value of data fidelity term :math:`(1/2) \| W (\sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s}) \|_2^2` ``RegL1`` : Value of regularisation term :math:`\sum_m \| \mathbf{x}_m \|_1` ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``XSlvRelRes`` : Relative residual of X step solver ``Time`` : Cumulative run time """
[docs] class Options(ConvTwoBlockCnstrnt.Options): r"""ConvBPDNMaskDcpl algorithm options Options include all of those defined in :class:`ConvTwoBlockCnstrnt.Options`, together with additional options: ``L1Weight`` : An array of weights for the :math:`\ell_1` norm. The array shape must be such that the array is compatible for multiplication with the `X` variable (see :func:`.cnvrep.l1Wshape` for more details). If this option is defined, the regularization term is :math:`\lambda \sum_m \| \mathbf{w}_m \odot \mathbf{x}_m \|_1` where :math:`\mathbf{w}_m` denotes slices of the weighting array on the filter index axis. """ defaults = copy.deepcopy(ConvTwoBlockCnstrnt.Options.defaults) defaults.update({'L1Weight': 1.0}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) ConvBPDNMaskDcpl algorithm options """ if opt is None: opt = {} ConvTwoBlockCnstrnt.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1') hdrtxt_objfn = ('Fnc', 'DFid', u('Regℓ1')) hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid', u('Regℓ1'): 'RegL1'} def __init__(self, D, S, lmbda, W=None, opt=None, dimK=None, dimN=2): """ | **Call graph** .. image:: ../_static/jonga/cbpdnmd_init.svg :width: 20% :target: ../_static/jonga/cbpdnmd_init.svg | Parameters ---------- D : array_like Dictionary matrix S : array_like Signal vector or matrix lmbda : float Regularisation parameter W : array_like Mask array. The array shape must be such that the array is compatible for multiplication with input array S (see :func:`.cnvrep.mskWshape` for more details). opt : :class:`ConvBPDNMaskDcpl.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial dimensions """ if opt is None: opt = ConvBPDNMaskDcpl.Options() super(ConvBPDNMaskDcpl, self).__init__(D, S, opt, dimK=dimK, dimN=dimN) self.lmbda = self.dtype.type(lmbda) if W is None: W = np.array([1.0], dtype=self.dtype) self.W = np.asarray(W.reshape(cr.mskWshape(W, self.cri)), dtype=self.dtype) self.wl1 = np.asarray(opt['L1Weight'], dtype=self.dtype) self.wl1 = self.wl1.reshape(cr.l1Wshape(self.wl1, self.cri))
[docs] def uinit(self, ushape): """Return initialiser for working variable U.""" if self.opt['Y0'] is None: return np.zeros(ushape, dtype=self.dtype) else: # If initial Y is non-zero, initial U is chosen so that # the relevant dual optimality criterion (see (3.10) in # boyd-2010-distributed) is satisfied. Ub0 = (self.W**2) * self.block_sep0(self.Y) / self.rho Ub1 = (self.lmbda/self.rho) * np.sign(self.block_sep1(self.Y)) return self.block_cat(Ub0, Ub1)
[docs] def ystep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{y}`. """ AXU = self.AX + self.U Y0 = (self.rho*(self.block_sep0(AXU) - self.S)) / \ (self.W**2 + self.rho) Y1 = sp.prox_l1(self.block_sep1(AXU), (self.lmbda / self.rho) * self.wl1) self.Y = self.block_cat(Y0, Y1) super(ConvBPDNMaskDcpl, self).ystep()
[docs] def eval_objfn(self): """Compute components of regularisation function as well as total contribution to objective function. """ g0v = self.obfn_g0(self.obfn_g0var()) g1v = self.obfn_g1(self.obfn_g1var()) obj = g0v + self.lmbda*g1v return (obj, g0v, g1v)
[docs] def obfn_g0(self, Y0): r"""Compute :math:`g_0(\mathbf{y}_0)` component of ADMM objective function. """ return (np.linalg.norm(self.W * Y0)**2) / 2.0
[docs] def obfn_g1(self, Y1): r"""Compute :math:`g_1(\mathbf{y_1})` component of ADMM objective function. """ return np.linalg.norm((self.wl1 * Y1).ravel(), 1)
[docs]class AddMaskSim(object): """Boundary masking for convolutional representations using the Additive Mask Simulation (AMS) technique described in :cite:`wohlberg-2016-boundary`. Implemented as a wrapper about a cbpdn.ConvBPDN or derived object (or any other object with sufficiently similar interface and internals). The wrapper is largely transparent, but must be taken into account when setting some of the options for the inner object, e.g. the shape of the ``L1Weight`` option array must take into account the extra dictionary atom appended by the wrapper. """ def __init__(self, cbpdnclass, D, S, W, *args, **kwargs): """ Parameters ---------- cbpdnclass : class name Type of internal cbpdn object (e.g. cbpdn.ConvBPDN) to be constructed D : array_like Dictionary array S : array_like Signal array W : array_like Mask array. The array shape must be such that the array is compatible for multiplication with input array S (see :func:`.cnvrep.mskWshape` for more details). *args Variable length list of arguments for constructor of internal cbpdn object **kwargs Keyword arguments for constructor of internal cbpdn object """ # Number of channel dimensions if 'dimK' in kwargs: dimK = kwargs['dimK'] else: dimK = None # Number of spatial dimensions if 'dimN' in kwargs: dimN = kwargs['dimN'] else: dimN = 2 # Infer problem dimensions self.cri = cr.CSC_ConvRepIndexing(D, S, dimK=dimK, dimN=dimN) # Construct impulse filter (or filters for the multi-channel # case) and append to dictionary if self.cri.Cd == 1: self.imp = np.zeros(D.shape[0:dimN] + (1,)) self.imp[(0,)*dimN] = 1.0 else: self.imp = np.zeros(D.shape[0:dimN] + (self.cri.Cd,)*2) for c in range(0, self.cri.Cd): self.imp[(0,)*dimN + (c, c,)] = 1.0 Di = np.concatenate((D, self.imp), axis=D.ndim-1) # Construct inner cbpdn object self.cbpdn = cbpdnclass(Di, S, *args, **kwargs) # Required because dictlrn.DictLearn assumes that all valid # xstep objects have an IterationStats attribute self.IterationStats = self.cbpdn.IterationStats # Mask matrix self.W = np.asarray(W.reshape(cr.mskWshape(W, self.cri)), dtype=self.cbpdn.dtype) # If Cd > 1 (i.e. a multi-channel dictionary) and mask has a # non-singleton channel dimension, swap that axis onto the # dictionary filter index dimension (where the # multiple-channel impulse filters are located) if self.cri.Cd > 1 and self.W.shape[self.cri.dimN] > 1: self.W = np.swapaxes(self.W, self.cri.axisC, self.cri.axisM) # Record ystep method of inner cbpdn object self.inner_ystep = self.cbpdn.ystep # Replace ystep method of inner cbpdn object with outer ystep self.cbpdn.ystep = MethodType(AddMaskSim.ystep, self) # Record obfn_gvar method of inner cbpdn object self.inner_obfn_gvar = self.cbpdn.obfn_gvar # Replace obfn_gvar method of inner cbpdn object with outer obfn_gvar self.cbpdn.obfn_gvar = MethodType(AddMaskSim.obfn_gvar, self)
[docs] def ystep(self): """This method is inserted into the inner cbpdn object, replacing its own ystep method, thereby providing a hook for applying the additional steps necessary for the AMS method. """ # Extract AMS part of ystep argument so that it is not # affected by the main part of the ystep amidx = self.index_addmsk() Yi = self.cbpdn.AX[amidx] + self.cbpdn.U[amidx] # Perform main part of ystep from inner cbpdn object self.inner_ystep() # Apply mask to AMS component and insert into Y from inner # cbpdn object Yi[np.where(self.W.astype(np.bool))] = 0.0 self.cbpdn.Y[amidx] = Yi
[docs] def obfn_gvar(self): """This method is inserted into the inner cbpdn object, replacing its own obfn_gvar method, thereby providing a hook for applying the additional steps necessary for the AMS method. """ # Get inner cbpdn object gvar gv = self.inner_obfn_gvar().copy() # Set slice corresponding to the coefficient map of the final # filter (the impulse inserted for the AMS method) to zero so # that it does not affect the results (e.g. l1 norm) computed # from this variable by the inner cbpdn object gv[..., -self.cri.Cd:] = 0 return gv
[docs] def solve(self): """Call the solve method of the inner cbpdn object and strip the AMS component from the returned result. """ # Call solve method of inner cbpdn object Xi = self.cbpdn.solve() # Copy attributes from inner cbpdn object self.timer = self.cbpdn.timer self.itstat = self.cbpdn.itstat # Return result of inner cbpdn object with AMS component removed return Xi[self.index_primary()]
[docs] def setdict(self, D=None): """Set dictionary array.""" Di = np.concatenate((D, sl.atleast_nd(D.ndim, self.imp)), axis=D.ndim-1) self.cbpdn.setdict(Di)
[docs] def getcoef(self): """Get result of inner cbpdn object with AMS component removed.""" return self.cbpdn.getcoef()[self.index_primary()]
[docs] def index_primary(self): """Return an index expression appropriate for extracting the primary (inner) component of the main variables X, Y, etc. """ return np.s_[..., 0:-self.cri.Cd]
[docs] def index_addmsk(self): """Return an index expression appropriate for extracting the additive mask (outer) component of the main variables X, Y, etc.""" return np.s_[..., -self.cri.Cd:]
[docs] def reconstruct(self, X=None): """Reconstruct representation.""" # If coefficient array not specified, use non-AMS part of Y from inner # cbpdn object if X is None: X = self.cbpdn.Y[self.index_primary()] # FFT of coefficient array Xf = sl.rfftn(X, None, self.cri.axisN) # Multiply in frequency domain with non-impulse component of # dictionary Sf = np.sum(self.cbpdn.Df[..., 0:-self.cri.Cd] * Xf, axis=self.cri.axisM) # Transform to spatial domain and return result return sl.irfftn(Sf, self.cri.Nv, self.cri.axisN)
[docs] def getitstat(self): """Get iteration stats from inner cbpdn object.""" return self.cbpdn.getitstat()
[docs]class ConvL1L1Grd(ConvBPDNMaskDcpl): r""" ADMM algorithm for a Convolutional Sparse Coding problem with an :math:`\ell_1` data fidelity term and both :math:`\ell_1` and :math:`\ell_2` of gradient regularisation terms :cite:`wohlberg-2016-convolutional2`. | .. inheritance-diagram:: ConvL1L1Grd :parts: 2 | Solve the optimisation problem .. math:: \mathrm{argmin}_\mathbf{x} \; \left\| W \left(\sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s}\right) \right\|_1 + \lambda \sum_m \| \mathbf{x}_m \|_1 + (\mu/2) \sum_i \sum_m \| G_i \mathbf{x}_m \|_2^2\;\;, where :math:`W` is a mask array and :math:`G_i` is an operator computing the derivative along index :math:`i`, via the ADMM problem .. math:: \mathrm{argmin}_{\mathbf{x},\mathbf{y}_0,\mathbf{y}_1} \; \| W \mathbf{y}_0 \|_1 + \lambda \| \mathbf{y}_1 \|_1 + (\mu/2) \sum_i \| \Gamma_i \mathbf{x} \|_2^2 \;\text{such that}\; \left( \begin{array}{c} D \\ I \end{array} \right) \mathbf{x} - \left( \begin{array}{c} \mathbf{y}_0 \\ \mathbf{y}_1 \end{array} \right) = \left( \begin{array}{c} \mathbf{s} \\ \mathbf{0} \end{array} \right) \;\;, where :math:`D \mathbf{x} = \sum_m \mathbf{d}_m * \mathbf{x}_m` and .. math:: \Gamma_i = \left( \begin{array}{ccc} G_i & 0 & \ldots \\ 0 & G_i & \ldots \\ \vdots & \vdots & \ddots \end{array} \right) \;\;. After termination of the :meth:`solve` method, attribute :attr:`itstat` is a list of tuples representing statistics of each iteration. The fields of the named tuple ``IterationStats`` are: ``Iter`` : Iteration number ``ObjFun`` : Objective function value ``DFid`` : Value of data fidelity term :math:`(1/2) \| W (\sum_m \mathbf{d}_m * \mathbf{x}_m - \mathbf{s}) \|_1` ``RegL1`` : Value of regularisation term :math:`\sum_m \| \mathbf{x}_m \|_1` ``RegGrad`` : Value of regularisation term :math:`(1/2) \sum_i \sum_m \| G_i \mathbf{x}_m \|_2^2` ``PrimalRsdl`` : Norm of primal residual ``DualRsdl`` : Norm of dual residual ``EpsPrimal`` : Primal residual stopping tolerance :math:`\epsilon_{\mathrm{pri}}` ``EpsDual`` : Dual residual stopping tolerance :math:`\epsilon_{\mathrm{dua}}` ``Rho`` : Penalty parameter ``XSlvRelRes`` : Relative residual of X step solver ``Time`` : Cumulative run time """
[docs] class Options(ConvBPDNMaskDcpl.Options): r"""ConvL1L1Grd algorithm options Options include all of those defined in :class:`ConvBPDNMaskDcpl.Options`, together with additional options: ``GradWeight`` : An array of weights :math:`w_m` for the term penalising the gradient of the coefficient maps. If this option is defined, the gradient regularization term is :math:`\sum_i \sum_m w_m \| G_i \mathbf{x}_m \|_2^2` where :math:`w_m` is the weight for filter index :math:`m`. The array should be an :math:`M`-vector where :math:`M` is the number of filters in the dictionary. """ defaults = copy.deepcopy(ConvBPDNMaskDcpl.Options.defaults) defaults.update({'GradWeight': 1.0}) def __init__(self, opt=None): """ Parameters ---------- opt : dict or None, optional (default None) ConvL1L1Grd algorithm options """ if opt is None: opt = {} ConvBPDNMaskDcpl.Options.__init__(self, opt)
itstat_fields_objfn = ('ObjFun', 'DFid', 'RegL1', 'RegGrad') hdrtxt_objfn = ('Fnc', 'DFid', u('Regℓ1'), u('Regℓ2∇')) hdrval_objfun = {'Fnc': 'ObjFun', 'DFid': 'DFid', u('Regℓ1'): 'RegL1', u('Regℓ2∇'): 'RegGrad'} def __init__(self, D, S, lmbda, mu, W=None, opt=None, dimK=None, dimN=2): """ | **Call graph** .. image:: ../_static/jonga/cl1l1grd_init.svg :width: 20% :target: ../_static/jonga/cl1l1grd_init.svg | Parameters ---------- D : array_like Dictionary matrix S : array_like Signal vector or matrix lmbda : float Regularisation parameter (l1) mu : float Regularisation parameter (gradient) W : array_like Mask array. The array shape must be such that the array is compatible for multiplication with input array S (see :func:`.cnvrep.mskWshape` for more details). opt : :class:`ConvL1L1Grd.Options` object Algorithm options dimK : 0, 1, or None, optional (default None) Number of dimensions in input signal corresponding to multiple independent signals dimN : int, optional (default 2) Number of spatial dimensions """ if opt is None: opt = ConvL1L1Grd.Options() super(ConvL1L1Grd, self).__init__(D, S, lmbda, W, opt, dimK=dimK, dimN=dimN) self.mu = self.dtype.type(mu) if hasattr(opt['GradWeight'], 'ndim'): self.Wgrd = np.asarray(opt['GradWeight'].reshape((1,)*(dimN+2) + opt['GradWeight'].shape), dtype=self.dtype) else: self.Wgrd = np.asarray(opt['GradWeight'], dtype=self.dtype) self.Gf, GHGf = sl.GradientFilters(self.cri.dimN+3, self.cri.axisN, self.cri.Nv, dtype=self.dtype) self.GHGf = self.Wgrd * GHGf
[docs] def setdict(self, D=None): """Set dictionary array.""" if D is not None: self.D = np.asarray(D, dtype=self.dtype) self.Df = sl.rfftn(self.D, self.cri.Nv, self.cri.axisN) if self.opt['HighMemSolve'] and self.cri.Cd == 1: self.c = sl.solvedbd_sm_c( self.Df, np.conj(self.Df), (self.mu / self.rho) * self.GHGf + 1.0, self.cri.axisM) else: self.c = None
[docs] def xstep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{x}`. """ self.YU[:] = self.Y - self.U self.block_sep0(self.YU)[:] += self.S YUf = sl.rfftn(self.YU, None, self.cri.axisN) if self.cri.Cd == 1: b = np.conj(self.Df) * self.block_sep0(YUf) + self.block_sep1(YUf) else: b = sl.inner(np.conj(self.Df), self.block_sep0(YUf), axis=self.cri.axisC) + self.block_sep1(YUf) if self.cri.Cd == 1: self.Xf[:] = sl.solvedbd_sm( self.Df, (self.mu / self.rho) * self.GHGf + 1.0, b, self.c, self.cri.axisM) else: self.Xf[:] = sl.solvemdbi_ism( self.Df, (self.mu / self.rho) * self.GHGf + 1.0, b, self.cri.axisM, self.cri.axisC) self.X = sl.irfftn(self.Xf, self.cri.Nv, self.cri.axisN) if self.opt['LinSolveCheck']: Dop = lambda x: sl.inner(self.Df, x, axis=self.cri.axisM) if self.cri.Cd == 1: DHop = lambda x: np.conj(self.Df) * x else: DHop = lambda x: sl.inner(np.conj(self.Df), x, axis=self.cri.axisC) ax = DHop(Dop(self.Xf)) + ((self.mu / self.rho) * self.GHGf + 1.0) * self.Xf self.xrrs = sl.rrs(ax, b) else: self.xrrs = None
[docs] def ystep(self): r"""Minimise Augmented Lagrangian with respect to :math:`\mathbf{y}`. """ AXU = self.AX + self.U Y0 = sp.prox_l1(self.block_sep0(AXU) - self.S, (1.0/self.rho)*self.W) Y1 = sp.prox_l1(self.block_sep1(AXU), (self.lmbda/self.rho)*self.wl1) self.Y = self.block_cat(Y0, Y1) super(ConvBPDNMaskDcpl, self).ystep()
[docs] def eval_objfn(self): """Compute components of regularisation function as well as total contribution to objective function. """ g0v = self.obfn_g0(self.obfn_g0var()) g1v = self.obfn_g1(self.obfn_g1var()) rgr = sl.rfl2norm2(np.sqrt(self.GHGf * np.conj(self.Xf) * self.Xf), self.cri.Nv, self.cri.axisN)/2.0 obj = g0v + self.lmbda*g1v + self.mu*rgr return (obj, g0v, g1v, rgr)
[docs] def obfn_g0(self, Y0): r"""Compute :math:`g_0(\mathbf{y}_0)` component of ADMM objective function. """ return np.sum(np.abs(self.W * self.obfn_g0var()))
[docs] def rsdl_s(self, Yprev, Y): """Compute dual residual vector.""" return self.rho * self.cnst_AT(Yprev - Y)
[docs] def rsdl_sn(self, U): """Compute dual residual normalisation term.""" return self.rho * np.linalg.norm(self.cnst_AT(U))
[docs] def rhochange(self): """Updated cached c array when rho changes.""" if self.opt['HighMemSolve'] and self.cri.Cd == 1: self.c = sl.solvedbd_sm_c( self.Df, np.conj(self.Df), (self.mu / self.rho) * self.GHGf + 1.0, self.cri.axisM)
[docs]class MultiDictConvBPDN(object): r"""Solve a convolutional sparse coding problem fitting a single set of coefficient maps to multiple dictionaries and signals, e.g. .. math:: \mathrm{argmin}_\mathbf{x} \; (1/2) \left\| D_0 \mathbf{x} - \mathbf{s}_0 \right\|_2^2 + (1/2) \left\| D_1 \mathbf{x} - \mathbf{s}_1 \right\|_2^2 + \lambda \| \mathbf{x} \|_1 \;\;, for input images :math:`\mathbf{s}_0`, :math:`\mathbf{s}_1`, dictionaries :math:`D_0` and :math:`D_0`, and coefficient map set :math:`\mathbf{x}`, where :math:`D_0 \mathbf{x} = \sum_m \mathbf{d}_{0,m} \mathbf{x}_m` and :math:`D_1 \mathbf{x} = \sum_m \mathbf{d}_{1,m} \mathbf{x}_m`. Implemented as a wrapper about a :class:`ConvBPDN` or derived object (or any other object with sufficiently similar interface and internals). """ def __init__(self, cbpdnclass, D, S, *args, **kwargs): """ Parameters ---------- cbpdnclass : class name Type of internal cbpdn object (e.g. cbpdn.ConvBPDN) to be constructed D : tuple of array_like Set of dictionary arrays S : tuple of array_like Set of signal arrays *args Variable length list of arguments for constructor of internal cbpdn object **kwargs Keyword arguments for constructor of internal cbpdn object """ # Number of spatial dimensions if 'dimN' in kwargs: dimN = kwargs['dimN'] else: dimN = 2 # Number of channel dimensions in D[0] (should be the same for all) dimC = D[0].ndim - dimN - 1 # Number of filters in D[0] (should be the same for all) M = D[0].shape[-1] # Determine spatial and channel sizes of members of dictionary set if dimC == 0: chn = [1,] * len(D) else: chn = [D[b].shape[dimN] for b in range(0, len(D))] C = int(np.sum(np.asarray(chn))) dsz = np.asarray((0,) * dimN) for b in range(0, len(D)): dsz = np.maximum(dsz, np.asarray(D[b].shape[0:dimN])) # Construct single dictionary array with multiple dictionaries # stacked on the channel index Dm = np.zeros(tuple(dsz.tolist()) + (C,) + (M,)) chncs = np.cumsum(np.asarray([0,] + chn)) slc0 = (slice(None),)*dimN + (np.newaxis,)*(1-dimC) for b in range(0, len(D)): slc1 = tuple([slice(0, n) for n in D[b].shape[0:dimN]] + [slice(chncs[b], chncs[b+1])]) Dm[slc1] = D[b][slc0] # Construct single signal array (this is simpler since all # members of the signal set are assumed to be of the same # size) Sm = np.concatenate([S[b][slc0] for b in range(0, len(S))], dimN+dimC) # Construct inner cbpdn object self.cbpdn = cbpdnclass(Dm, Sm, *args, **kwargs) # Record some problem parameters self.dimN = dimN self.chn = chn self.chncs = chncs self.C = C
[docs] def solve(self): """Call the solve method of the inner cbpdn object and return the result. """ # Call solve method of inner cbpdn object Xi = self.cbpdn.solve() # Copy attributes from inner cbpdn object self.timer = self.cbpdn.timer self.itstat = self.cbpdn.itstat # Return result of inner cbpdn object return Xi
[docs] def getcoef(self): """Call the getcoef method if the inner cbpdn object.""" return self.cbpdn.getcoef()
[docs] def getitstat(self): """Get iteration stats from inner cbpdn object.""" return self.cbpdn.getitstat()
[docs] def reconstruct(self, b, X=None): """Reconstruct representation of signal b in signal set.""" if X is None: X = self.getcoef() Xf = sl.rfftn(X, None, self.cbpdn.cri.axisN) slc = (slice(None),)*self.dimN + \ (slice(self.chncs[b], self.chncs[b+1]),) Sf = np.sum(self.cbpdn.Df[slc] * Xf, axis=self.cbpdn.cri.axisM) return sl.irfftn(Sf, self.cbpdn.cri.Nv, self.cbpdn.cri.axisN)