# -*- coding: utf-8 -*-
# Copyright (C) 2015-2020 by Brendt Wohlberg <brendt@ieee.org>
# All rights reserved. BSD 3-clause License.
# This file is part of the SPORCO package. Details of the copyright
# and user license can be found in the 'LICENSE.txt' file distributed
# with the package.
"""Classes for ADMM algorithms for Robust PCA optimisation"""
from __future__ import division, absolute_import
import copy
import numpy as np
from sporco.admm import admm
import sporco.prox as sp
from sporco.util import u
__author__ = """Brendt Wohlberg <brendt@ieee.org>"""
[docs]
class RobustPCA(admm.ADMM):
r"""ADMM algorithm for Robust PCA problem :cite:`candes-2011-robust`
:cite:`cai-2010-singular`.
Solve the optimisation problem
.. math::
\mathrm{argmin}_{X, Y} \;
\| X \|_* + \lambda \| Y \|_1 \quad \text{such that}
\quad X + Y = S \;\;.
This problem is unusual in that it is already in ADMM form without
the need for any variable splitting.
After termination of the :meth:`solve` method, attribute :attr:`itstat` is
a list of tuples representing statistics of each iteration. The
fields of the named tuple ``IterationStats`` are:
``Iter`` : Iteration number
``ObjFun`` : Objective function value
``NrmNuc`` : Value of nuclear norm term :math:`\| X \|_*`
``NrmL1`` : Value of :math:`\ell_1` norm term :math:`\| Y \|_1`
``Cnstr`` : Constraint violation :math:`\| X + Y - S\|_2`
``PrimalRsdl`` : Norm of primal residual
``DualRsdl`` : Norm of dual residual
``EpsPrimal`` : Primal residual stopping tolerance
:math:`\epsilon_{\mathrm{pri}}`
``EpsDual`` : Dual residual stopping tolerance
:math:`\epsilon_{\mathrm{dua}}`
``Rho`` : Penalty parameter
``Time`` : Cumulative run time
"""
[docs]
class Options(admm.ADMM.Options):
"""RobustPCA algorithm options
Options include all of those defined in
:class:`sporco.admm.admm.ADMM.Options`, together with
an additional option:
``fEvalX`` : Flag indicating whether the :math:`f` component
of the objective function should be evaluated using variable
X (``True``) or Y (``False``) as its argument.
``gEvalY`` : Flag indicating whether the :math:`g` component
of the objective function should be evaluated using variable
Y (``True``) or X (``False``) as its argument.
"""
defaults = copy.deepcopy(admm.ADMM.Options.defaults)
defaults.update({'gEvalY': True, 'fEvalX': True, 'RelaxParam': 1.8})
defaults['AutoRho'].update({'Enabled': True, 'Period': 1,
'AutoScaling': True, 'Scaling': 1000.0,
'RsdlRatio': 1.2})
def __init__(self, opt=None):
"""
Parameters
----------
opt : dict or None, optional (default None)
RobustPCA algorithm options
"""
if opt is None:
opt = {}
admm.ADMM.Options.__init__(self, opt)
if self['AutoRho', 'RsdlTarget'] is None:
self['AutoRho', 'RsdlTarget'] = 1.0
itstat_fields_objfn = ('ObjFun', 'NrmNuc', 'NrmL1', 'Cnstr')
hdrtxt_objfn = ('Fnc', 'NrmNuc', u('Nrmℓ1'), 'Cnstr')
hdrval_objfun = {'Fnc': 'ObjFun', 'NrmNuc': 'NrmNuc',
u('Nrmℓ1'): 'NrmL1', 'Cnstr': 'Cnstr'}
def __init__(self, S, lmbda=None, opt=None):
"""
Parameters
----------
S : array_like
Signal vector or matrix
lmbda : float
Regularisation parameter
opt : RobustPCA.Options object
Algorithm options
"""
if opt is None:
opt = RobustPCA.Options()
# Set dtype attribute based on S.dtype and opt['DataType']
self.set_dtype(opt, S.dtype)
# Set default lambda value if not specified
if lmbda is None:
lmbda = S.shape[0] ** -0.5
self.lmbda = self.dtype.type(lmbda)
# Set penalty parameter
self.set_attr('rho', opt['rho'], dval=(2.0*self.lmbda + 0.1),
dtype=self.dtype)
Nx = S.size
super(RobustPCA, self).__init__(Nx, S.shape, S.shape, S.dtype, opt)
self.S = np.asarray(S, dtype=self.dtype)
[docs]
def uinit(self, ushape):
"""Return initialiser for working variable U"""
if self.opt['Y0'] is None:
return np.zeros(ushape, dtype=self.dtype)
else:
# If initial Y is non-zero, initial U is chosen so that
# the relevant dual optimality criterion (see (3.10) in
# boyd-2010-distributed) is satisfied.
return (self.lmbda/self.rho)*np.sign(self.Y)
[docs]
def solve(self):
"""Start (or re-start) optimisation."""
super(RobustPCA, self).solve()
return self.X, self.Y
[docs]
def xstep(self):
r"""Minimise Augmented Lagrangian with respect to
:math:`\mathbf{x}`.
"""
self.X, self.ss = sp.prox_nuclear(self.S - self.Y - self.U,
1/self.rho)
[docs]
def ystep(self):
r"""Minimise Augmented Lagrangian with respect to
:math:`\mathbf{y}`.
"""
self.Y = np.asarray(sp.prox_l1(self.S - self.AX - self.U,
self.lmbda/self.rho), dtype=self.dtype)
[docs]
def obfn_fvar(self):
"""Variable to be evaluated in computing regularisation term,
depending on 'fEvalX' option value.
"""
if self.opt['fEvalX']:
return self.X
else:
return self.cnst_c() - self.cnst_B(self.Y)
[docs]
def obfn_gvar(self):
"""Variable to be evaluated in computing regularisation term,
depending on 'gEvalY' option value.
"""
if self.opt['gEvalY']:
return self.Y
else:
return self.cnst_c() - self.cnst_A(self.X)
[docs]
def eval_objfn(self):
"""Compute components of objective function as well as total
contribution to objective function.
"""
if self.opt['fEvalX']:
rnn = np.sum(self.ss)
else:
rnn = sp.norm_nuclear(self.obfn_fvar())
rl1 = np.sum(np.abs(self.obfn_gvar()))
cns = np.linalg.norm(self.X + self.Y - self.S)
obj = rnn + self.lmbda*rl1
return (obj, rnn, rl1, cns)
[docs]
def cnst_A(self, X):
r"""Compute :math:`A \mathbf{x}` component of ADMM problem
constraint. In this case :math:`A \mathbf{x} = \mathbf{x}`.
"""
return X
[docs]
def cnst_AT(self, X):
r"""Compute :math:`A^T \mathbf{x}` where :math:`A \mathbf{x}` is
a component of ADMM problem constraint. In this case
:math:`A^T \mathbf{x} = \mathbf{x}`.
"""
return X
[docs]
def cnst_B(self, Y):
r"""Compute :math:`B \mathbf{y}` component of ADMM problem
constraint. In this case :math:`B \mathbf{y} = -\mathbf{y}`.
"""
return Y
[docs]
def cnst_c(self):
r"""Compute constant component :math:`\mathbf{c}` of ADMM problem
constraint. In this case :math:`\mathbf{c} = \mathbf{s}`.
"""
return self.S